mz_clusterd_test_driver/
persist_host.rs1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
14use std::sync::Arc;
15
16use mz_ore::metrics::MetricsRegistry;
17use mz_persist_client::PersistClient;
18use mz_persist_client::cache::PersistClientCache;
19use mz_persist_client::cfg::PersistConfig;
20use mz_persist_client::rpc::PersistGrpcPubSubServer;
21use mz_persist_types::PersistLocation;
22use tokio::net::TcpListener;
23use tokio_stream::wrappers::TcpListenerStream;
24
25pub struct PersistHost {
28 cache: Arc<PersistClientCache>,
29 pubsub_port: u16,
30 location: PersistLocation,
31}
32
33impl PersistHost {
34 pub async fn start(location: PersistLocation) -> anyhow::Result<Self> {
36 Self::start_on(
37 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
38 location,
39 )
40 .await
41 }
42
43 pub async fn start_on(bind: SocketAddr, location: PersistLocation) -> anyhow::Result<Self> {
48 let registry = MetricsRegistry::new();
49 let persist_cfg = PersistConfig::new_default_configs(
50 &mz_persist_client::BUILD_INFO,
51 mz_ore::now::SYSTEM_TIME.clone(),
52 );
53
54 let server = PersistGrpcPubSubServer::new(&persist_cfg, ®istry);
55 let conn = server.new_same_process_connection();
56
57 let listener = TcpListener::bind(bind).await?;
58 let pubsub_port = listener.local_addr()?.port();
59 mz_ore::task::spawn(|| "persist_pubsub_server", async move {
60 server
61 .serve_with_stream(TcpListenerStream::new(listener))
62 .await
63 .expect("pubsub server");
64 });
65
66 let cache = PersistClientCache::new(persist_cfg, ®istry, |_cfg, _metrics| conn);
67 Ok(PersistHost {
68 cache: Arc::new(cache),
69 pubsub_port,
70 location,
71 })
72 }
73
74 pub fn pubsub_port(&self) -> u16 {
80 self.pubsub_port
81 }
82
83 pub fn pubsub_url(&self) -> String {
88 format!("http://127.0.0.1:{}", self.pubsub_port)
89 }
90
91 pub fn location(&self) -> &PersistLocation {
93 &self.location
94 }
95
96 pub async fn client(&self) -> anyhow::Result<PersistClient> {
98 Ok(self.cache.open(self.location.clone()).await?)
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 fn file_location() -> (PersistLocation, tempfile::TempDir) {
110 let dir = tempfile::tempdir().expect("tempdir");
111 let blob = format!("file://{}", dir.path().display());
112 let consensus = std::env::var("COCKROACH_URL").unwrap_or_else(|_| {
113 "postgres://root@127.0.0.1:26257?options=--search_path=mz_driver".to_string()
114 });
115 let location = PersistLocation {
116 blob_uri: blob.parse().expect("blob uri"),
117 consensus_uri: consensus.parse().expect("consensus uri"),
118 };
119 (location, dir)
120 }
121
122 #[mz_ore::test(tokio::test)]
123 #[cfg_attr(miri, ignore)]
124 async fn host_starts_and_opens_client() {
125 if std::env::var("COCKROACH_URL").is_err() {
126 return; }
128 let (location, _dir) = file_location();
129 let host = PersistHost::start(location).await.expect("host");
130 assert!(host.pubsub_url().starts_with("http://127.0.0.1:"));
131
132 tokio::net::TcpStream::connect(("127.0.0.1", host.pubsub_port()))
134 .await
135 .expect("pubsub server listening");
136
137 let _client = host.client().await.expect("client");
138 }
139}