Skip to main content

mz_clusterd_test_driver/
ctp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Compute CTP connection and the `Hello` step of the controller handshake.
11//! Generic: it sends any `ComputeCommand` and receives any `ComputeResponse`.
12//!
13//! Only `Hello` (the transport/version step) happens here; the controller
14//! handshake proper — `CreateInstance`, `UpdateConfiguration`,
15//! `InitializationComplete` — is driven by explicit caller commands, so the
16//! caller controls the instance config, the peek-stash setting, and exactly when
17//! the reconciliation window opens and closes.
18
19use std::time::Duration;
20
21use mz_compute_client::protocol::command::ComputeCommand;
22use mz_compute_client::protocol::response::ComputeResponse;
23use mz_service::client::GenericClient;
24use mz_service::transport::{Client, NoopMetrics};
25use uuid::Uuid;
26
27pub type ComputeCtpClient = Client<ComputeCommand, ComputeResponse>;
28
29/// Connects to a clusterd compute controller address and sends `Hello`, leaving
30/// the controller handshake (`CreateInstance` onward) to the caller.
31///
32/// A reconnect re-runs exactly this: a fresh transport connection plus `Hello`.
33/// The reconciliation window then opens when the script sends `CreateInstance`
34/// and closes when it sends `InitializationComplete`; in between, the replica
35/// reconciles the replayed dataflows against its live ones rather than
36/// rehydrating.
37pub async fn connect_and_hello(compute_addr: &str) -> anyhow::Result<ComputeCtpClient> {
38    // Use persist-client's BUILD_INFO: it is release-versioned (synced by
39    // bin/bump-version), so it matches the clusterd we connect to. Our own
40    // crate is `0.0.0`, which would fail the handshake's version check.
41    let version = mz_persist_client::BUILD_INFO.semver_version();
42    let mut client = Client::<ComputeCommand, ComputeResponse>::connect(
43        compute_addr,
44        version,
45        Duration::from_secs(30),
46        Duration::from_secs(60),
47        NoopMetrics,
48    )
49    .await?;
50
51    client
52        .send(ComputeCommand::Hello {
53            nonce: Uuid::new_v4(),
54        })
55        .await?;
56
57    Ok(client)
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63    use crate::target;
64
65    #[mz_ore::test(tokio::test)]
66    #[cfg_attr(miri, ignore)]
67    async fn hello_holds_connection() {
68        if !target::e2e_enabled() {
69            return;
70        }
71        let mut client = connect_and_hello(&target::compute_addr())
72            .await
73            .expect("hello");
74        let r = tokio::time::timeout(Duration::from_millis(500), client.recv()).await;
75        assert!(r.is_err() || r.unwrap().is_ok());
76    }
77}