Skip to main content

mz_clusterd_test_driver/
persist_host.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Hosts the persist PubSub server and a `PersistClientCache` wired to it.
11//! This is the mechanism's persist access layer; it embeds no workload.
12
13use std::net::{IpAddr, Ipv4Addr, SocketAddr};
14use std::sync::Arc;
15
16use mz_ore::metrics::MetricsRegistry;
17use mz_persist_client::PersistClient;
18use mz_persist_client::cache::PersistClientCache;
19use mz_persist_client::cfg::PersistConfig;
20use mz_persist_client::rpc::PersistGrpcPubSubServer;
21use mz_persist_types::PersistLocation;
22use tokio::net::TcpListener;
23use tokio_stream::wrappers::TcpListenerStream;
24
25/// Hosts persist infrastructure for the driver: a PubSub server plus a client
26/// cache configured to use it. `clusterd` connects to `pubsub_url()`.
27pub struct PersistHost {
28    cache: Arc<PersistClientCache>,
29    pubsub_port: u16,
30    location: PersistLocation,
31}
32
33impl PersistHost {
34    /// Starts a PubSub server on an ephemeral localhost port (for unit tests).
35    pub async fn start(location: PersistLocation) -> anyhow::Result<Self> {
36        Self::start_on(
37            SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
38            location,
39        )
40        .await
41    }
42
43    /// Starts a PubSub server bound to `bind` and builds a cache wired to it,
44    /// using the given blob/consensus location for all shards. mzcompose passes
45    /// a fixed routable bind (e.g. `0.0.0.0:6879`) so clusterd can be configured
46    /// with the driver's PubSub URL at its own startup.
47    pub async fn start_on(bind: SocketAddr, location: PersistLocation) -> anyhow::Result<Self> {
48        let registry = MetricsRegistry::new();
49        let persist_cfg = PersistConfig::new_default_configs(
50            &mz_persist_client::BUILD_INFO,
51            mz_ore::now::SYSTEM_TIME.clone(),
52        );
53
54        let server = PersistGrpcPubSubServer::new(&persist_cfg, &registry);
55        let conn = server.new_same_process_connection();
56
57        let listener = TcpListener::bind(bind).await?;
58        let pubsub_port = listener.local_addr()?.port();
59        mz_ore::task::spawn(|| "persist_pubsub_server", async move {
60            server
61                .serve_with_stream(TcpListenerStream::new(listener))
62                .await
63                .expect("pubsub server");
64        });
65
66        let cache = PersistClientCache::new(persist_cfg, &registry, |_cfg, _metrics| conn);
67        Ok(PersistHost {
68            cache: Arc::new(cache),
69            pubsub_port,
70            location,
71        })
72    }
73
74    /// The port the PubSub server is listening on.
75    ///
76    /// Prefer this for the container/mzcompose case, where `clusterd` reaches
77    /// the driver by service hostname: the composition builds the URL from the
78    /// driver's service name plus this port.
79    pub fn pubsub_port(&self) -> u16 {
80        self.pubsub_port
81    }
82
83    /// A loopback PubSub URL, valid only for same-host use (local iteration,
84    /// unit tests). For cross-container use, build the URL from the driver's
85    /// routable hostname and [`PersistHost::pubsub_port`] instead — this method
86    /// always returns `127.0.0.1` regardless of the bind address.
87    pub fn pubsub_url(&self) -> String {
88        format!("http://127.0.0.1:{}", self.pubsub_port)
89    }
90
91    /// The shared blob/consensus location to embed in shard metadata.
92    pub fn location(&self) -> &PersistLocation {
93        &self.location
94    }
95
96    /// Opens a persist client against the hosted location.
97    pub async fn client(&self) -> anyhow::Result<PersistClient> {
98        Ok(self.cache.open(self.location.clone()).await?)
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    /// Returns a file-backed location plus the `TempDir` guard, which the
107    /// caller must keep alive for the duration of the test so the blob
108    /// directory is not cleaned up while in use.
109    fn file_location() -> (PersistLocation, tempfile::TempDir) {
110        let dir = tempfile::tempdir().expect("tempdir");
111        let blob = format!("file://{}", dir.path().display());
112        let consensus = std::env::var("COCKROACH_URL").unwrap_or_else(|_| {
113            "postgres://root@127.0.0.1:26257?options=--search_path=mz_driver".to_string()
114        });
115        let location = PersistLocation {
116            blob_uri: blob.parse().expect("blob uri"),
117            consensus_uri: consensus.parse().expect("consensus uri"),
118        };
119        (location, dir)
120    }
121
122    #[mz_ore::test(tokio::test)]
123    #[cfg_attr(miri, ignore)]
124    async fn host_starts_and_opens_client() {
125        if std::env::var("COCKROACH_URL").is_err() {
126            return; // requires a running CockroachDB; skip otherwise
127        }
128        let (location, _dir) = file_location();
129        let host = PersistHost::start(location).await.expect("host");
130        assert!(host.pubsub_url().starts_with("http://127.0.0.1:"));
131
132        // The PubSub server must actually be listening on the reported port.
133        tokio::net::TcpStream::connect(("127.0.0.1", host.pubsub_port()))
134            .await
135            .expect("pubsub server listening");
136
137        let _client = host.client().await.expect("client");
138    }
139}