1use std::collections::BTreeMap;
11use std::error::Error;
12use std::future::IntoFuture;
13use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
14use std::path::{Path, PathBuf};
15use std::pin::Pin;
16use std::str::FromStr;
17use std::sync::Arc;
18use std::sync::LazyLock;
19use std::time::Duration;
20use std::{env, fs, iter};
21
22use anyhow::anyhow;
23use futures::Future;
24use futures::future::{BoxFuture, LocalBoxFuture};
25use headers::{Header, HeaderMapExt};
26use http::Uri;
27use hyper::http::header::HeaderMap;
28use maplit::btreemap;
29use mz_adapter::TimestampExplanation;
30use mz_adapter_types::bootstrap_builtin_cluster_config::{
31 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
32 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
33 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
34};
35
36use mz_auth::password::Password;
37use mz_catalog::config::ClusterReplicaSizeMap;
38use mz_controller::ControllerConfig;
39use mz_dyncfg::ConfigUpdates;
40use mz_license_keys::ValidatedLicenseKey;
41use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
42use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
43use mz_ore::metrics::MetricsRegistry;
44use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
45use mz_ore::retry::Retry;
46use mz_ore::task;
47use mz_ore::tracing::{
48 OpenTelemetryConfig, StderrLogConfig, StderrLogFormat, TracingConfig, TracingHandle,
49};
50use mz_persist_client::PersistLocation;
51use mz_persist_client::cache::PersistClientCache;
52use mz_persist_client::cfg::{CONSENSUS_CONNECTION_POOL_MAX_SIZE, PersistConfig};
53use mz_persist_client::rpc::PersistGrpcPubSubServer;
54use mz_secrets::SecretsController;
55use mz_server_core::listeners::{
56 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpRoutesEnabled,
57};
58use mz_server_core::{ReloadTrigger, TlsCertConfig};
59use mz_sql::catalog::EnvironmentId;
60use mz_storage_types::connections::ConnectionContext;
61use mz_tracing::CloneableEnvFilter;
62use openssl::asn1::Asn1Time;
63use openssl::error::ErrorStack;
64use openssl::hash::MessageDigest;
65use openssl::nid::Nid;
66use openssl::pkey::{PKey, Private};
67use openssl::rsa::Rsa;
68use openssl::ssl::{SslConnector, SslConnectorBuilder, SslMethod, SslOptions};
69use openssl::x509::extension::{BasicConstraints, SubjectAlternativeName};
70use openssl::x509::{X509, X509Name, X509NameBuilder};
71use postgres::error::DbError;
72use postgres::tls::{MakeTlsConnect, TlsConnect};
73use postgres::types::{FromSql, Type};
74use postgres::{NoTls, Socket};
75use postgres_openssl::MakeTlsConnector;
76use tempfile::TempDir;
77use tokio::net::TcpListener;
78use tokio::runtime::Runtime;
79use tokio_postgres::config::{Host, SslMode};
80use tokio_postgres::{AsyncMessage, Client};
81use tokio_stream::wrappers::TcpListenerStream;
82use tower_http::cors::AllowOrigin;
83use tracing::Level;
84use tracing_capture::SharedStorage;
85use tracing_subscriber::EnvFilter;
86use tungstenite::stream::MaybeTlsStream;
87use tungstenite::{Message, WebSocket};
88
89use crate::{
90 CatalogConfig, FronteggAuthenticator, HttpListenerConfig, ListenersConfig, SqlListenerConfig,
91 WebSocketAuth, WebSocketResponse,
92};
93
94pub static KAFKA_ADDRS: LazyLock<String> =
95 LazyLock::new(|| env::var("KAFKA_ADDRS").unwrap_or_else(|_| "localhost:9092".into()));
96
97#[derive(Clone)]
99pub struct TestHarness {
100 data_directory: Option<PathBuf>,
101 tls: Option<TlsCertConfig>,
102 frontegg: Option<FronteggAuthenticator>,
103 external_login_password_mz_system: Option<Password>,
104 listeners_config: ListenersConfig,
105 unsafe_mode: bool,
106 workers: usize,
107 now: NowFn,
108 seed: u32,
109 storage_usage_collection_interval: Duration,
110 storage_usage_retention_period: Option<Duration>,
111 default_cluster_replica_size: String,
112 default_cluster_replication_factor: u32,
113 builtin_system_cluster_config: BootstrapBuiltinClusterConfig,
114 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig,
115 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig,
116 builtin_support_cluster_config: BootstrapBuiltinClusterConfig,
117 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig,
118
119 propagate_crashes: bool,
120 enable_tracing: bool,
121 orchestrator_tracing_cli_args: TracingCliArgs,
124 bootstrap_role: Option<String>,
125 deploy_generation: u64,
126 system_parameter_defaults: BTreeMap<String, String>,
127 internal_console_redirect_url: Option<String>,
128 metrics_registry: Option<MetricsRegistry>,
129 code_version: semver::Version,
130 capture: Option<SharedStorage>,
131 pub environment_id: EnvironmentId,
132}
133
134impl Default for TestHarness {
135 fn default() -> TestHarness {
136 TestHarness {
137 data_directory: None,
138 tls: None,
139 frontegg: None,
140 external_login_password_mz_system: None,
141 listeners_config: ListenersConfig {
142 sql: btreemap![
143 "external".to_owned() => SqlListenerConfig {
144 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
145 authenticator_kind: AuthenticatorKind::None,
146 allowed_roles: AllowedRoles::Normal,
147 enable_tls: false,
148 },
149 "internal".to_owned() => SqlListenerConfig {
150 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
151 authenticator_kind: AuthenticatorKind::None,
152 allowed_roles: AllowedRoles::NormalAndInternal,
153 enable_tls: false,
154 },
155 ],
156 http: btreemap![
157 "external".to_owned() => HttpListenerConfig {
158 base: BaseListenerConfig {
159 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
160 authenticator_kind: AuthenticatorKind::None,
161 allowed_roles: AllowedRoles::Normal,
162 enable_tls: false,
163 },
164 routes: HttpRoutesEnabled{
165 base: true,
166 webhook: true,
167 internal: false,
168 metrics: false,
169 profiling: false,
170 mcp_agent: false,
171 mcp_developer: false,
172 console_config: true,
173 },
174 },
175 "internal".to_owned() => HttpListenerConfig {
176 base: BaseListenerConfig {
177 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
178 authenticator_kind: AuthenticatorKind::None,
179 allowed_roles: AllowedRoles::NormalAndInternal,
180 enable_tls: false,
181 },
182 routes: HttpRoutesEnabled{
183 base: true,
184 webhook: true,
185 internal: true,
186 metrics: true,
187 profiling: true,
188 mcp_agent: false,
189 mcp_developer: false,
190 console_config: true,
191 },
192 },
193 ],
194 },
195 unsafe_mode: false,
196 workers: 1,
197 now: SYSTEM_TIME.clone(),
198 seed: rand::random(),
199 storage_usage_collection_interval: Duration::from_secs(3600),
200 storage_usage_retention_period: None,
201 default_cluster_replica_size: "scale=1,workers=1".to_string(),
202 default_cluster_replication_factor: 1,
203 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
204 size: "scale=1,workers=1".to_string(),
205 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
206 },
207 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
208 size: "scale=1,workers=1".to_string(),
209 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
210 },
211 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
212 size: "scale=1,workers=1".to_string(),
213 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
214 },
215 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
216 size: "scale=1,workers=1".to_string(),
217 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
218 },
219 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
220 size: "scale=1,workers=1".to_string(),
221 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
222 },
223 propagate_crashes: false,
224 enable_tracing: false,
225 bootstrap_role: Some("materialize".into()),
226 deploy_generation: 0,
227 system_parameter_defaults: BTreeMap::from([(
230 "log_filter".to_string(),
231 "error".to_string(),
232 )]),
233 internal_console_redirect_url: None,
234 metrics_registry: None,
235 orchestrator_tracing_cli_args: TracingCliArgs {
236 startup_log_filter: CloneableEnvFilter::from_str("error").expect("must parse"),
237 ..Default::default()
238 },
239 code_version: crate::BUILD_INFO.semver_version(),
240 environment_id: EnvironmentId::for_tests(),
241 capture: None,
242 }
243 }
244}
245
246impl TestHarness {
247 pub async fn start(self) -> TestServer {
251 self.try_start().await.expect("Failed to start test Server")
252 }
253
254 pub async fn start_with_trigger(self, tls_reload_certs: ReloadTrigger) -> TestServer {
256 self.try_start_with_trigger(tls_reload_certs)
257 .await
258 .expect("Failed to start test Server")
259 }
260
261 pub async fn try_start(self) -> Result<TestServer, anyhow::Error> {
263 self.try_start_with_trigger(mz_server_core::cert_reload_never_reload())
264 .await
265 }
266
267 pub async fn try_start_with_trigger(
269 self,
270 tls_reload_certs: ReloadTrigger,
271 ) -> Result<TestServer, anyhow::Error> {
272 let listeners = Listeners::new(&self).await?;
273 listeners.serve_with_trigger(self, tls_reload_certs).await
274 }
275
276 pub fn start_blocking(self) -> TestServerWithRuntime {
278 let runtime = tokio::runtime::Builder::new_multi_thread()
279 .enable_all()
280 .thread_stack_size(mz_ore::stack::STACK_SIZE)
281 .build()
282 .expect("failed to spawn runtime for test");
283 let runtime = Arc::new(runtime);
284 let server = runtime.block_on(self.start());
285 TestServerWithRuntime { runtime, server }
286 }
287
288 pub fn data_directory(mut self, data_directory: impl Into<PathBuf>) -> Self {
289 self.data_directory = Some(data_directory.into());
290 self
291 }
292
293 pub fn with_tls(mut self, cert_path: impl Into<PathBuf>, key_path: impl Into<PathBuf>) -> Self {
294 self.tls = Some(TlsCertConfig {
295 cert: cert_path.into(),
296 key: key_path.into(),
297 });
298 for (_, listener) in &mut self.listeners_config.sql {
299 listener.enable_tls = true;
300 }
301 for (_, listener) in &mut self.listeners_config.http {
302 listener.base.enable_tls = true;
303 }
304 self
305 }
306
307 pub fn unsafe_mode(mut self) -> Self {
308 self.unsafe_mode = true;
309 self
310 }
311
312 pub fn workers(mut self, workers: usize) -> Self {
313 self.workers = workers;
314 self
315 }
316
317 pub fn with_frontegg_auth(mut self, frontegg: &FronteggAuthenticator) -> Self {
318 self.frontegg = Some(frontegg.clone());
319 let enable_tls = self.tls.is_some();
320 self.listeners_config = ListenersConfig {
321 sql: btreemap! {
322 "external".to_owned() => SqlListenerConfig {
323 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
324 authenticator_kind: AuthenticatorKind::Frontegg,
325 allowed_roles: AllowedRoles::Normal,
326 enable_tls,
327 },
328 "internal".to_owned() => SqlListenerConfig {
329 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
330 authenticator_kind: AuthenticatorKind::None,
331 allowed_roles: AllowedRoles::NormalAndInternal,
332 enable_tls: false,
333 },
334 },
335 http: btreemap! {
336 "external".to_owned() => HttpListenerConfig {
337 base: BaseListenerConfig {
338 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
339 authenticator_kind: AuthenticatorKind::Frontegg,
340 allowed_roles: AllowedRoles::Normal,
341 enable_tls,
342 },
343 routes: HttpRoutesEnabled{
344 base: true,
345 webhook: true,
346 internal: false,
347 metrics: false,
348 profiling: false,
349 mcp_agent: false,
350 mcp_developer: false,
351 console_config: true,
352 },
353 },
354 "internal".to_owned() => HttpListenerConfig {
355 base: BaseListenerConfig {
356 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
357 authenticator_kind: AuthenticatorKind::None,
358 allowed_roles: AllowedRoles::NormalAndInternal,
359 enable_tls: false,
360 },
361 routes: HttpRoutesEnabled{
362 base: true,
363 webhook: true,
364 internal: true,
365 metrics: true,
366 profiling: true,
367 mcp_agent: false,
368 mcp_developer: false,
369 console_config: true,
370 },
371 },
372 },
373 };
374 self
375 }
376
377 pub fn with_oidc_auth(
378 mut self,
379 issuer: Option<String>,
380 authentication_claim: Option<String>,
381 expected_audiences: Option<Vec<String>>,
382 ) -> Self {
383 let enable_tls = self.tls.is_some();
384 self.listeners_config = ListenersConfig {
385 sql: btreemap! {
386 "external".to_owned() => SqlListenerConfig {
387 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
388 authenticator_kind: AuthenticatorKind::Oidc,
389 allowed_roles: AllowedRoles::Normal,
390 enable_tls,
391 },
392 "internal".to_owned() => SqlListenerConfig {
393 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
394 authenticator_kind: AuthenticatorKind::None,
395 allowed_roles: AllowedRoles::NormalAndInternal,
396 enable_tls: false,
397 },
398 },
399 http: btreemap! {
400 "external".to_owned() => HttpListenerConfig {
401 base: BaseListenerConfig {
402 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
403 authenticator_kind: AuthenticatorKind::Oidc,
404 allowed_roles: AllowedRoles::Normal,
405 enable_tls,
406 },
407 routes: HttpRoutesEnabled{
408 base: true,
409 webhook: true,
410 internal: false,
411 metrics: false,
412 profiling: false,
413 mcp_agent: false,
414 mcp_developer: false,
415 console_config: true,
416 },
417 },
418 "internal".to_owned() => HttpListenerConfig {
419 base: BaseListenerConfig {
420 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
421 authenticator_kind: AuthenticatorKind::None,
422 allowed_roles: AllowedRoles::NormalAndInternal,
423 enable_tls: false,
424 },
425 routes: HttpRoutesEnabled{
426 base: true,
427 webhook: true,
428 internal: true,
429 metrics: true,
430 profiling: true,
431 mcp_agent: false,
432 mcp_developer: false,
433 console_config: true,
434 },
435 },
436 },
437 };
438
439 if let Some(issuer) = issuer {
440 self.system_parameter_defaults
441 .insert("oidc_issuer".to_string(), issuer);
442 }
443
444 if let Some(authentication_claim) = authentication_claim {
445 self.system_parameter_defaults.insert(
446 "oidc_authentication_claim".to_string(),
447 authentication_claim,
448 );
449 }
450
451 if let Some(expected_audiences) = expected_audiences {
452 self.system_parameter_defaults.insert(
453 "oidc_audience".to_string(),
454 serde_json::to_string(&expected_audiences).unwrap(),
455 );
456 }
457
458 self
459 }
460
461 pub fn with_password_auth(mut self, mz_system_password: Password) -> Self {
462 self.external_login_password_mz_system = Some(mz_system_password);
463 let enable_tls = self.tls.is_some();
464 self.listeners_config = ListenersConfig {
465 sql: btreemap! {
466 "external".to_owned() => SqlListenerConfig {
467 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
468 authenticator_kind: AuthenticatorKind::Password,
469 allowed_roles: AllowedRoles::NormalAndInternal,
470 enable_tls,
471 },
472 },
473 http: btreemap! {
474 "external".to_owned() => HttpListenerConfig {
475 base: BaseListenerConfig {
476 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
477 authenticator_kind: AuthenticatorKind::Password,
478 allowed_roles: AllowedRoles::NormalAndInternal,
479 enable_tls,
480 },
481 routes: HttpRoutesEnabled{
482 base: true,
483 webhook: true,
484 internal: true,
485 metrics: false,
486 profiling: true,
487 mcp_agent: false,
488 mcp_developer: false,
489 console_config: true,
490 },
491 },
492 "metrics".to_owned() => HttpListenerConfig {
493 base: BaseListenerConfig {
494 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
495 authenticator_kind: AuthenticatorKind::None,
496 allowed_roles: AllowedRoles::NormalAndInternal,
497 enable_tls: false,
498 },
499 routes: HttpRoutesEnabled{
500 base: false,
501 webhook: false,
502 internal: false,
503 metrics: true,
504 profiling: false,
505 mcp_agent: false,
506 mcp_developer: false,
507 console_config: true,
508 },
509 },
510 },
511 };
512 self
513 }
514
515 pub fn with_sasl_scram_auth(mut self, mz_system_password: Password) -> Self {
516 self.external_login_password_mz_system = Some(mz_system_password);
517 let enable_tls = self.tls.is_some();
518 self.listeners_config = ListenersConfig {
519 sql: btreemap! {
520 "external".to_owned() => SqlListenerConfig {
521 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
522 authenticator_kind: AuthenticatorKind::Sasl,
523 allowed_roles: AllowedRoles::NormalAndInternal,
524 enable_tls,
525 },
526 },
527 http: btreemap! {
528 "external".to_owned() => HttpListenerConfig {
529 base: BaseListenerConfig {
530 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
531 authenticator_kind: AuthenticatorKind::Password,
532 allowed_roles: AllowedRoles::NormalAndInternal,
533 enable_tls,
534 },
535 routes: HttpRoutesEnabled{
536 base: true,
537 webhook: true,
538 internal: true,
539 metrics: false,
540 profiling: true,
541 mcp_agent: false,
542 mcp_developer: false,
543 console_config: true,
544 },
545 },
546 "metrics".to_owned() => HttpListenerConfig {
547 base: BaseListenerConfig {
548 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
549 authenticator_kind: AuthenticatorKind::None,
550 allowed_roles: AllowedRoles::NormalAndInternal,
551 enable_tls: false,
552 },
553 routes: HttpRoutesEnabled{
554 base: false,
555 webhook: false,
556 internal: false,
557 metrics: true,
558 profiling: false,
559 mcp_agent: false,
560 mcp_developer: false,
561 console_config: true,
562 },
563 },
564 },
565 };
566 self
567 }
568
569 pub fn with_now(mut self, now: NowFn) -> Self {
570 self.now = now;
571 self
572 }
573
574 pub fn with_storage_usage_collection_interval(
575 mut self,
576 storage_usage_collection_interval: Duration,
577 ) -> Self {
578 self.storage_usage_collection_interval = storage_usage_collection_interval;
579 self
580 }
581
582 pub fn with_storage_usage_retention_period(
583 mut self,
584 storage_usage_retention_period: Duration,
585 ) -> Self {
586 self.storage_usage_retention_period = Some(storage_usage_retention_period);
587 self
588 }
589
590 pub fn with_default_cluster_replica_size(
591 mut self,
592 default_cluster_replica_size: String,
593 ) -> Self {
594 self.default_cluster_replica_size = default_cluster_replica_size;
595 self
596 }
597
598 pub fn with_builtin_system_cluster_replica_size(
599 mut self,
600 builtin_system_cluster_replica_size: String,
601 ) -> Self {
602 self.builtin_system_cluster_config.size = builtin_system_cluster_replica_size;
603 self
604 }
605
606 pub fn with_builtin_system_cluster_replication_factor(
607 mut self,
608 builtin_system_cluster_replication_factor: u32,
609 ) -> Self {
610 self.builtin_system_cluster_config.replication_factor =
611 builtin_system_cluster_replication_factor;
612 self
613 }
614
615 pub fn with_builtin_catalog_server_cluster_replica_size(
616 mut self,
617 builtin_catalog_server_cluster_replica_size: String,
618 ) -> Self {
619 self.builtin_catalog_server_cluster_config.size =
620 builtin_catalog_server_cluster_replica_size;
621 self
622 }
623
624 pub fn with_propagate_crashes(mut self, propagate_crashes: bool) -> Self {
625 self.propagate_crashes = propagate_crashes;
626 self
627 }
628
629 pub fn with_enable_tracing(mut self, enable_tracing: bool) -> Self {
630 self.enable_tracing = enable_tracing;
631 self
632 }
633
634 pub fn with_bootstrap_role(mut self, bootstrap_role: Option<String>) -> Self {
635 self.bootstrap_role = bootstrap_role;
636 self
637 }
638
639 pub fn with_deploy_generation(mut self, deploy_generation: u64) -> Self {
640 self.deploy_generation = deploy_generation;
641 self
642 }
643
644 pub fn with_system_parameter_default(mut self, param: String, value: String) -> Self {
645 self.system_parameter_defaults.insert(param, value);
646 self
647 }
648
649 pub fn with_mcp_routes(mut self, agent: bool, developer: bool) -> Self {
650 for config in self.listeners_config.http.values_mut() {
651 config.routes.mcp_agent = agent;
652 config.routes.mcp_developer = developer;
653 }
654 self
655 }
656
657 pub fn with_internal_console_redirect_url(
658 mut self,
659 internal_console_redirect_url: Option<String>,
660 ) -> Self {
661 self.internal_console_redirect_url = internal_console_redirect_url;
662 self
663 }
664
665 pub fn with_metrics_registry(mut self, registry: MetricsRegistry) -> Self {
666 self.metrics_registry = Some(registry);
667 self
668 }
669
670 pub fn with_code_version(mut self, version: semver::Version) -> Self {
671 self.code_version = version;
672 self
673 }
674
675 pub fn with_capture(mut self, storage: SharedStorage) -> Self {
676 self.capture = Some(storage);
677 self
678 }
679}
680
681pub struct Listeners {
682 pub inner: crate::Listeners,
683}
684
685impl Listeners {
686 pub async fn new(config: &TestHarness) -> Result<Listeners, anyhow::Error> {
687 let inner = crate::Listeners::bind(config.listeners_config.clone()).await?;
688 Ok(Listeners { inner })
689 }
690
691 pub async fn serve(self, config: TestHarness) -> Result<TestServer, anyhow::Error> {
692 self.serve_with_trigger(config, mz_server_core::cert_reload_never_reload())
693 .await
694 }
695
696 pub async fn serve_with_trigger(
697 self,
698 config: TestHarness,
699 tls_reload_certs: ReloadTrigger,
700 ) -> Result<TestServer, anyhow::Error> {
701 let (data_directory, temp_dir) = match config.data_directory {
702 None => {
703 let temp_dir = tempfile::tempdir()?;
708 (temp_dir.path().to_path_buf(), Some(temp_dir))
709 }
710 Some(data_directory) => (data_directory, None),
711 };
712 let scratch_dir = tempfile::tempdir()?;
713 let (consensus_uri, timestamp_oracle_url) = {
714 let seed = config.seed;
715 let cockroach_url = env::var("METADATA_BACKEND_URL")
716 .map_err(|_| anyhow!("METADATA_BACKEND_URL environment variable is not set"))?;
717 let (client, conn) = tokio_postgres::connect(&cockroach_url, NoTls).await?;
718 mz_ore::task::spawn(|| "startup-postgres-conn", async move {
719 if let Err(err) = conn.await {
720 panic!("connection error: {}", err);
721 };
722 });
723 client
724 .batch_execute(&format!(
725 "CREATE SCHEMA IF NOT EXISTS consensus_{seed};
726 CREATE SCHEMA IF NOT EXISTS tsoracle_{seed};"
727 ))
728 .await?;
729 (
730 format!("{cockroach_url}?options=--search_path=consensus_{seed}")
731 .parse()
732 .expect("invalid consensus URI"),
733 format!("{cockroach_url}?options=--search_path=tsoracle_{seed}")
734 .parse()
735 .expect("invalid timestamp oracle URI"),
736 )
737 };
738 let metrics_registry = config.metrics_registry.unwrap_or_else(MetricsRegistry::new);
739 let orchestrator = ProcessOrchestrator::new(ProcessOrchestratorConfig {
740 image_dir: env::current_exe()?
741 .parent()
742 .unwrap()
743 .parent()
744 .unwrap()
745 .to_path_buf(),
746 suppress_output: false,
747 environment_id: config.environment_id.to_string(),
748 secrets_dir: data_directory.join("secrets"),
749 command_wrapper: vec![],
750 propagate_crashes: config.propagate_crashes,
751 tcp_proxy: None,
752 scratch_directory: scratch_dir.path().to_path_buf(),
753 })
754 .await?;
755 let orchestrator = Arc::new(orchestrator);
756 let persist_now = SYSTEM_TIME.clone();
759 let dyncfgs = mz_dyncfgs::all_dyncfgs();
760
761 let mut updates = ConfigUpdates::default();
762 updates.add(&CONSENSUS_CONNECTION_POOL_MAX_SIZE, 1);
765 updates.apply(&dyncfgs);
766
767 let mut persist_cfg = PersistConfig::new(&crate::BUILD_INFO, persist_now.clone(), dyncfgs);
768 persist_cfg.build_version = config.code_version;
769 persist_cfg.set_rollup_threshold(5);
771
772 let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_cfg, &metrics_registry);
773 let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
774 let persist_pubsub_tcp_listener =
775 TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
776 .await
777 .expect("pubsub addr binding");
778 let persist_pubsub_server_port = persist_pubsub_tcp_listener
779 .local_addr()
780 .expect("pubsub addr has local addr")
781 .port();
782
783 mz_ore::task::spawn(|| "persist_pubsub_server", async move {
785 persist_pubsub_server
786 .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
787 .await
788 .expect("success")
789 });
790 let persist_clients =
791 PersistClientCache::new(persist_cfg, &metrics_registry, |_, _| persist_pubsub_client);
792 let persist_clients = Arc::new(persist_clients);
793
794 let secrets_controller = Arc::clone(&orchestrator);
795 let connection_context = ConnectionContext::for_tests(orchestrator.reader());
796 let orchestrator = Arc::new(TracingOrchestrator::new(
797 orchestrator,
798 config.orchestrator_tracing_cli_args,
799 ));
800 let tracing_handle = if config.enable_tracing {
801 let config = TracingConfig::<fn(&tracing::Metadata) -> sentry_tracing::EventFilter> {
802 service_name: "environmentd",
803 stderr_log: StderrLogConfig {
804 format: StderrLogFormat::Json,
805 filter: EnvFilter::default(),
806 },
807 opentelemetry: Some(OpenTelemetryConfig {
808 endpoint: "http://fake_address_for_testing:8080".to_string(),
809 headers: http::HeaderMap::new(),
810 filter: EnvFilter::default().add_directive(Level::DEBUG.into()),
811 resource: opentelemetry_sdk::resource::Resource::builder().build(),
812 max_batch_queue_size: 2048,
813 max_export_batch_size: 512,
814 max_concurrent_exports: 1,
815 batch_scheduled_delay: Duration::from_millis(5000),
816 max_export_timeout: Duration::from_secs(30),
817 }),
818 tokio_console: None,
819 sentry: None,
820 build_version: crate::BUILD_INFO.version,
821 build_sha: crate::BUILD_INFO.sha,
822 registry: metrics_registry.clone(),
823 capture: config.capture,
824 };
825 mz_ore::tracing::configure(config).await?
826 } else {
827 TracingHandle::disabled()
828 };
829 let host_name = format!(
830 "localhost:{}",
831 self.inner.http["external"].handle.local_addr.port()
832 );
833 let catalog_config = CatalogConfig {
834 persist_clients: Arc::clone(&persist_clients),
835 metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
836 };
837
838 let inner = self
839 .inner
840 .serve(crate::Config {
841 catalog_config,
842 timestamp_oracle_url: Some(timestamp_oracle_url),
843 controller: ControllerConfig {
844 build_info: &crate::BUILD_INFO,
845 orchestrator,
846 clusterd_image: "clusterd".into(),
847 init_container_image: None,
848 deploy_generation: config.deploy_generation,
849 persist_location: PersistLocation {
850 blob_uri: format!("file://{}/persist/blob", data_directory.display())
851 .parse()
852 .expect("invalid blob URI"),
853 consensus_uri,
854 },
855 persist_clients,
856 now: config.now.clone(),
857 metrics_registry: metrics_registry.clone(),
858 persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
859 secrets_args: mz_service::secrets::SecretsReaderCliArgs {
860 secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
861 secrets_reader_local_file_dir: Some(data_directory.join("secrets")),
862 secrets_reader_kubernetes_context: None,
863 secrets_reader_aws_prefix: None,
864 secrets_reader_name_prefix: None,
865 },
866 connection_context,
867 replica_http_locator: Default::default(),
868 },
869 secrets_controller,
870 cloud_resource_controller: None,
871 tls: config.tls,
872 frontegg: config.frontegg,
873 unsafe_mode: config.unsafe_mode,
874 all_features: false,
875 metrics_registry: metrics_registry.clone(),
876 now: config.now,
877 environment_id: config.environment_id,
878 cors_allowed_origin: AllowOrigin::list([]),
879 cors_allowed_origin_list: Vec::new(),
880 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
881 bootstrap_default_cluster_replica_size: config.default_cluster_replica_size,
882 bootstrap_default_cluster_replication_factor: config
883 .default_cluster_replication_factor,
884 bootstrap_builtin_system_cluster_config: config.builtin_system_cluster_config,
885 bootstrap_builtin_catalog_server_cluster_config: config
886 .builtin_catalog_server_cluster_config,
887 bootstrap_builtin_probe_cluster_config: config.builtin_probe_cluster_config,
888 bootstrap_builtin_support_cluster_config: config.builtin_support_cluster_config,
889 bootstrap_builtin_analytics_cluster_config: config.builtin_analytics_cluster_config,
890 system_parameter_defaults: config.system_parameter_defaults,
891 availability_zones: Default::default(),
892 tracing_handle,
893 storage_usage_collection_interval: config.storage_usage_collection_interval,
894 storage_usage_retention_period: config.storage_usage_retention_period,
895 segment_api_key: None,
896 segment_client_side: false,
897 test_only_dummy_segment_client: false,
898 egress_addresses: vec![],
899 aws_account_id: None,
900 aws_privatelink_availability_zones: None,
901 launchdarkly_sdk_key: None,
902 launchdarkly_key_map: Default::default(),
903 config_sync_file_path: None,
904 config_sync_timeout: Duration::from_secs(30),
905 config_sync_loop_interval: None,
906 bootstrap_role: config.bootstrap_role,
907 http_host_name: Some(host_name),
908 internal_console_redirect_url: config.internal_console_redirect_url,
909 tls_reload_certs,
910 helm_chart_version: None,
911 license_key: ValidatedLicenseKey::for_tests(),
912 external_login_password_mz_system: config.external_login_password_mz_system,
913 force_builtin_schema_migration: None,
914 })
915 .await?;
916
917 Ok(TestServer {
918 inner,
919 metrics_registry,
920 _temp_dir: temp_dir,
921 _scratch_dir: scratch_dir,
922 })
923 }
924}
925
926pub struct TestServer {
928 pub inner: crate::Server,
929 pub metrics_registry: MetricsRegistry,
930 _temp_dir: Option<TempDir>,
932 _scratch_dir: TempDir,
933}
934
935impl TestServer {
936 pub fn connect(&self) -> ConnectBuilder<'_, postgres::NoTls, NoHandle> {
937 ConnectBuilder::new(self).no_tls()
938 }
939
940 pub async fn enable_feature_flags(&self, flags: &[&'static str]) {
941 let internal_client = self.connect().internal().await.unwrap();
942
943 for flag in flags {
944 internal_client
945 .batch_execute(&format!("ALTER SYSTEM SET {} = true;", flag))
946 .await
947 .unwrap();
948 }
949 }
950
951 pub async fn disable_feature_flags(&self, flags: &[&'static str]) {
952 let internal_client = self.connect().internal().await.unwrap();
953
954 for flag in flags {
955 internal_client
956 .batch_execute(&format!("ALTER SYSTEM SET {} = false;", flag))
957 .await
958 .unwrap();
959 }
960 }
961
962 pub fn ws_addr(&self) -> Uri {
963 format!(
964 "ws://{}/api/experimental/sql",
965 self.inner.http_listener_handles["external"].local_addr
966 )
967 .parse()
968 .unwrap()
969 }
970
971 pub fn internal_ws_addr(&self) -> Uri {
972 format!(
973 "ws://{}/api/experimental/sql",
974 self.inner.http_listener_handles["internal"].local_addr
975 )
976 .parse()
977 .unwrap()
978 }
979
980 pub fn http_local_addr(&self) -> SocketAddr {
981 self.inner.http_listener_handles["external"].local_addr
982 }
983
984 pub fn internal_http_local_addr(&self) -> SocketAddr {
985 self.inner.http_listener_handles["internal"].local_addr
986 }
987
988 pub fn sql_local_addr(&self) -> SocketAddr {
989 self.inner.sql_listener_handles["external"].local_addr
990 }
991
992 pub fn internal_sql_local_addr(&self) -> SocketAddr {
993 self.inner.sql_listener_handles["internal"].local_addr
994 }
995}
996
997pub struct ConnectBuilder<'s, T, H> {
1001 server: &'s TestServer,
1003
1004 pg_config: tokio_postgres::Config,
1006 port: u16,
1008 tls: T,
1010
1011 notice_callback: Option<Box<dyn FnMut(tokio_postgres::error::DbError) + Send + 'static>>,
1013
1014 _with_handle: H,
1016}
1017
1018impl<'s> ConnectBuilder<'s, (), NoHandle> {
1019 fn new(server: &'s TestServer) -> Self {
1020 let mut pg_config = tokio_postgres::Config::new();
1021 pg_config
1022 .host(&Ipv4Addr::LOCALHOST.to_string())
1023 .user("materialize")
1024 .options("--welcome_message=off")
1025 .application_name("environmentd_test_framework");
1026
1027 ConnectBuilder {
1028 server,
1029 pg_config,
1030 port: server.sql_local_addr().port(),
1031 tls: (),
1032 notice_callback: None,
1033 _with_handle: NoHandle,
1034 }
1035 }
1036}
1037
1038impl<'s, T, H> ConnectBuilder<'s, T, H> {
1039 pub fn no_tls(self) -> ConnectBuilder<'s, postgres::NoTls, H> {
1043 ConnectBuilder {
1044 server: self.server,
1045 pg_config: self.pg_config,
1046 port: self.port,
1047 tls: postgres::NoTls,
1048 notice_callback: self.notice_callback,
1049 _with_handle: self._with_handle,
1050 }
1051 }
1052
1053 pub fn with_tls<Tls>(self, tls: Tls) -> ConnectBuilder<'s, Tls, H>
1055 where
1056 Tls: MakeTlsConnect<Socket> + Send + 'static,
1057 Tls::TlsConnect: Send,
1058 Tls::Stream: Send,
1059 <Tls::TlsConnect as TlsConnect<Socket>>::Future: Send,
1060 {
1061 ConnectBuilder {
1062 server: self.server,
1063 pg_config: self.pg_config,
1064 port: self.port,
1065 tls,
1066 notice_callback: self.notice_callback,
1067 _with_handle: self._with_handle,
1068 }
1069 }
1070
1071 pub fn with_config(mut self, pg_config: tokio_postgres::Config) -> Self {
1073 self.pg_config = pg_config;
1074 self
1075 }
1076
1077 pub fn ssl_mode(mut self, mode: SslMode) -> Self {
1079 self.pg_config.ssl_mode(mode);
1080 self
1081 }
1082
1083 pub fn user(mut self, user: &str) -> Self {
1085 self.pg_config.user(user);
1086 self
1087 }
1088
1089 pub fn password(mut self, password: &str) -> Self {
1091 self.pg_config.password(password);
1092 self
1093 }
1094
1095 pub fn application_name(mut self, application_name: &str) -> Self {
1097 self.pg_config.application_name(application_name);
1098 self
1099 }
1100
1101 pub fn dbname(mut self, dbname: &str) -> Self {
1103 self.pg_config.dbname(dbname);
1104 self
1105 }
1106
1107 pub fn options(mut self, options: &str) -> Self {
1109 self.pg_config.options(options);
1110 self
1111 }
1112
1113 pub fn internal(mut self) -> Self {
1118 self.port = self.server.internal_sql_local_addr().port();
1119 self.pg_config.user(mz_sql::session::user::SYSTEM_USER_NAME);
1120 self
1121 }
1122
1123 pub fn notice_callback(self, callback: impl FnMut(DbError) + Send + 'static) -> Self {
1125 ConnectBuilder {
1126 notice_callback: Some(Box::new(callback)),
1127 ..self
1128 }
1129 }
1130
1131 pub fn with_handle(self) -> ConnectBuilder<'s, T, WithHandle> {
1134 ConnectBuilder {
1135 server: self.server,
1136 pg_config: self.pg_config,
1137 port: self.port,
1138 tls: self.tls,
1139 notice_callback: self.notice_callback,
1140 _with_handle: WithHandle,
1141 }
1142 }
1143
1144 pub fn as_pg_config(&self) -> &tokio_postgres::Config {
1146 &self.pg_config
1147 }
1148}
1149
1150pub trait IncludeHandle: Send {
1153 type Output;
1154 fn transform_result(
1155 client: tokio_postgres::Client,
1156 handle: mz_ore::task::JoinHandle<()>,
1157 ) -> Self::Output;
1158}
1159
1160pub struct NoHandle;
1163impl IncludeHandle for NoHandle {
1164 type Output = tokio_postgres::Client;
1165 fn transform_result(
1166 client: tokio_postgres::Client,
1167 _handle: mz_ore::task::JoinHandle<()>,
1168 ) -> Self::Output {
1169 client
1170 }
1171}
1172
1173pub struct WithHandle;
1176impl IncludeHandle for WithHandle {
1177 type Output = (tokio_postgres::Client, mz_ore::task::JoinHandle<()>);
1178 fn transform_result(
1179 client: tokio_postgres::Client,
1180 handle: mz_ore::task::JoinHandle<()>,
1181 ) -> Self::Output {
1182 (client, handle)
1183 }
1184}
1185
1186impl<'s, T, H> IntoFuture for ConnectBuilder<'s, T, H>
1187where
1188 T: MakeTlsConnect<Socket> + Send + 'static,
1189 T::TlsConnect: Send,
1190 T::Stream: Send,
1191 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
1192 H: IncludeHandle,
1193{
1194 type Output = Result<H::Output, postgres::Error>;
1195 type IntoFuture = BoxFuture<'static, Self::Output>;
1196
1197 fn into_future(mut self) -> Self::IntoFuture {
1198 Box::pin(async move {
1199 assert!(
1200 self.pg_config.get_ports().is_empty(),
1201 "specifying multiple ports is not supported"
1202 );
1203 self.pg_config.port(self.port);
1204
1205 let (client, mut conn) = self.pg_config.connect(self.tls).await?;
1206 let mut notice_callback = self.notice_callback.take();
1207
1208 let handle = task::spawn(|| "connect", async move {
1209 while let Some(msg) = std::future::poll_fn(|cx| conn.poll_message(cx)).await {
1210 match msg {
1211 Ok(AsyncMessage::Notice(notice)) => {
1212 if let Some(callback) = notice_callback.as_mut() {
1213 callback(notice);
1214 }
1215 }
1216 Ok(msg) => {
1217 tracing::debug!(?msg, "Dropping message from database");
1218 }
1219 Err(e) => {
1220 tracing::info!("connection error: {e}");
1225 break;
1226 }
1227 }
1228 }
1229 tracing::info!("connection closed");
1230 });
1231
1232 let output = H::transform_result(client, handle);
1233 Ok(output)
1234 })
1235 }
1236}
1237
1238pub struct TestServerWithRuntime {
1243 server: TestServer,
1244 runtime: Arc<Runtime>,
1245}
1246
1247impl TestServerWithRuntime {
1248 pub fn runtime(&self) -> &Arc<Runtime> {
1252 &self.runtime
1253 }
1254
1255 pub fn inner(&self) -> &crate::Server {
1257 &self.server.inner
1258 }
1259
1260 pub fn connect<T>(&self, tls: T) -> Result<postgres::Client, postgres::Error>
1262 where
1263 T: MakeTlsConnect<Socket> + Send + 'static,
1264 T::TlsConnect: Send,
1265 T::Stream: Send,
1266 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
1267 {
1268 self.pg_config().connect(tls)
1269 }
1270
1271 pub fn connect_internal<T>(&self, tls: T) -> Result<postgres::Client, anyhow::Error>
1273 where
1274 T: MakeTlsConnect<Socket> + Send + 'static,
1275 T::TlsConnect: Send,
1276 T::Stream: Send,
1277 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
1278 {
1279 Ok(self.pg_config_internal().connect(tls)?)
1280 }
1281
1282 pub fn enable_feature_flags(&self, flags: &[&'static str]) {
1284 let mut internal_client = self.connect_internal(postgres::NoTls).unwrap();
1285
1286 for flag in flags {
1287 internal_client
1288 .batch_execute(&format!("ALTER SYSTEM SET {} = true;", flag))
1289 .unwrap();
1290 }
1291 }
1292
1293 pub fn disable_feature_flags(&self, flags: &[&'static str]) {
1295 let mut internal_client = self.connect_internal(postgres::NoTls).unwrap();
1296
1297 for flag in flags {
1298 internal_client
1299 .batch_execute(&format!("ALTER SYSTEM SET {} = false;", flag))
1300 .unwrap();
1301 }
1302 }
1303
1304 pub fn pg_config(&self) -> postgres::Config {
1307 let local_addr = self.server.sql_local_addr();
1308 let mut config = postgres::Config::new();
1309 config
1310 .host(&Ipv4Addr::LOCALHOST.to_string())
1311 .port(local_addr.port())
1312 .user("materialize")
1313 .options("--welcome_message=off");
1314 config
1315 }
1316
1317 pub fn pg_config_internal(&self) -> postgres::Config {
1320 let local_addr = self.server.internal_sql_local_addr();
1321 let mut config = postgres::Config::new();
1322 config
1323 .host(&Ipv4Addr::LOCALHOST.to_string())
1324 .port(local_addr.port())
1325 .user("mz_system")
1326 .options("--welcome_message=off");
1327 config
1328 }
1329
1330 pub fn ws_addr(&self) -> Uri {
1331 self.server.ws_addr()
1332 }
1333
1334 pub fn internal_ws_addr(&self) -> Uri {
1335 self.server.internal_ws_addr()
1336 }
1337
1338 pub fn http_local_addr(&self) -> SocketAddr {
1339 self.server.http_local_addr()
1340 }
1341
1342 pub fn internal_http_local_addr(&self) -> SocketAddr {
1343 self.server.internal_http_local_addr()
1344 }
1345
1346 pub fn sql_local_addr(&self) -> SocketAddr {
1347 self.server.sql_local_addr()
1348 }
1349
1350 pub fn internal_sql_local_addr(&self) -> SocketAddr {
1351 self.server.internal_sql_local_addr()
1352 }
1353
1354 pub fn metrics_registry(&self) -> &MetricsRegistry {
1356 &self.server.metrics_registry
1357 }
1358}
1359
1360#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
1361pub struct MzTimestamp(pub u64);
1362
1363impl<'a> FromSql<'a> for MzTimestamp {
1364 fn from_sql(ty: &Type, raw: &'a [u8]) -> Result<MzTimestamp, Box<dyn Error + Sync + Send>> {
1365 let n = mz_pgrepr::Numeric::from_sql(ty, raw)?;
1366 Ok(MzTimestamp(u64::try_from(n.0.0)?))
1367 }
1368
1369 fn accepts(ty: &Type) -> bool {
1370 mz_pgrepr::Numeric::accepts(ty)
1371 }
1372}
1373
1374pub trait PostgresErrorExt {
1375 fn unwrap_db_error(self) -> DbError;
1376}
1377
1378impl PostgresErrorExt for postgres::Error {
1379 fn unwrap_db_error(self) -> DbError {
1380 match self.source().and_then(|e| e.downcast_ref::<DbError>()) {
1381 Some(e) => e.clone(),
1382 None => panic!("expected DbError, but got: {:?}", self),
1383 }
1384 }
1385}
1386
1387impl<T, E> PostgresErrorExt for Result<T, E>
1388where
1389 E: PostgresErrorExt,
1390{
1391 fn unwrap_db_error(self) -> DbError {
1392 match self {
1393 Ok(_) => panic!("expected Err(DbError), but got Ok(_)"),
1394 Err(e) => e.unwrap_db_error(),
1395 }
1396 }
1397}
1398
1399pub async fn insert_with_deterministic_timestamps(
1403 table: &'static str,
1404 values: &'static str,
1405 server: &TestServer,
1406 now: Arc<std::sync::Mutex<EpochMillis>>,
1407) -> Result<(), Box<dyn Error>> {
1408 let client_write = server.connect().await?;
1409 let client_read = server.connect().await?;
1410
1411 let mut current_ts = get_explain_timestamp(table, &client_read).await;
1412
1413 let insert_query = format!("INSERT INTO {table} VALUES {values}");
1414
1415 let write_future = client_write.execute(&insert_query, &[]);
1416 let timestamp_interval = tokio::time::interval(Duration::from_millis(1));
1417
1418 let mut write_future = std::pin::pin!(write_future);
1419 let mut timestamp_interval = std::pin::pin!(timestamp_interval);
1420
1421 loop {
1424 tokio::select! {
1425 _ = (&mut write_future) => return Ok(()),
1426 _ = timestamp_interval.tick() => {
1427 current_ts += 1;
1428 *now.lock().expect("lock poisoned") = current_ts;
1429 }
1430 };
1431 }
1432}
1433
1434pub async fn get_explain_timestamp(from_suffix: &str, client: &Client) -> EpochMillis {
1435 try_get_explain_timestamp(from_suffix, client)
1436 .await
1437 .unwrap()
1438}
1439
1440pub async fn try_get_explain_timestamp(
1441 from_suffix: &str,
1442 client: &Client,
1443) -> Result<EpochMillis, anyhow::Error> {
1444 let det = get_explain_timestamp_determination(from_suffix, client).await?;
1445 let ts = det.determination.timestamp_context.timestamp_or_default();
1446 Ok(ts.into())
1447}
1448
1449pub async fn get_explain_timestamp_determination(
1450 from_suffix: &str,
1451 client: &Client,
1452) -> Result<TimestampExplanation, anyhow::Error> {
1453 let row = client
1454 .query_one(
1455 &format!("EXPLAIN TIMESTAMP AS JSON FOR SELECT * FROM {from_suffix}"),
1456 &[],
1457 )
1458 .await?;
1459 let explain: String = row.get(0);
1460 Ok(serde_json::from_str(&explain).unwrap())
1461}
1462
1463pub async fn create_postgres_source_with_table<'a>(
1471 server: &TestServer,
1472 mz_client: &Client,
1473 table_name: &str,
1474 table_schema: &str,
1475 source_name: &str,
1476) -> (
1477 Client,
1478 impl FnOnce(&'a Client, &'a Client) -> LocalBoxFuture<'a, ()>,
1479) {
1480 server
1481 .enable_feature_flags(&["enable_create_table_from_source"])
1482 .await;
1483
1484 let postgres_url = env::var("POSTGRES_URL")
1485 .map_err(|_| anyhow!("POSTGRES_URL environment variable is not set"))
1486 .unwrap();
1487
1488 let (pg_client, connection) = tokio_postgres::connect(&postgres_url, postgres::NoTls)
1489 .await
1490 .unwrap();
1491
1492 let pg_config: tokio_postgres::Config = postgres_url.parse().unwrap();
1493 let user = pg_config.get_user().unwrap_or("postgres");
1494 let db_name = pg_config.get_dbname().unwrap_or(user);
1495 let ports = pg_config.get_ports();
1496 let port = if ports.is_empty() { 5432 } else { ports[0] };
1497 let hosts = pg_config.get_hosts();
1498 let host = if hosts.is_empty() {
1499 "localhost".to_string()
1500 } else {
1501 match &hosts[0] {
1502 Host::Tcp(host) => host.to_string(),
1503 Host::Unix(host) => host.to_str().unwrap().to_string(),
1504 }
1505 };
1506 let password = pg_config.get_password();
1507
1508 mz_ore::task::spawn(|| "postgres-source-connection", async move {
1509 if let Err(e) = connection.await {
1510 panic!("connection error: {}", e);
1511 }
1512 });
1513
1514 let _ = pg_client
1516 .execute(&format!("DROP TABLE IF EXISTS {table_name};"), &[])
1517 .await
1518 .unwrap();
1519 let _ = pg_client
1520 .execute(&format!("DROP PUBLICATION IF EXISTS {source_name};"), &[])
1521 .await
1522 .unwrap();
1523 let _ = pg_client
1524 .execute(&format!("CREATE TABLE {table_name} {table_schema};"), &[])
1525 .await
1526 .unwrap();
1527 let _ = pg_client
1528 .execute(
1529 &format!("ALTER TABLE {table_name} REPLICA IDENTITY FULL;"),
1530 &[],
1531 )
1532 .await
1533 .unwrap();
1534 let _ = pg_client
1535 .execute(
1536 &format!("CREATE PUBLICATION {source_name} FOR TABLE {table_name};"),
1537 &[],
1538 )
1539 .await
1540 .unwrap();
1541
1542 let mut connection_str = format!("HOST '{host}', PORT {port}, USER {user}, DATABASE {db_name}");
1544 if let Some(password) = password {
1545 let password = std::str::from_utf8(password).unwrap();
1546 mz_client
1547 .batch_execute(&format!("CREATE SECRET s AS '{password}'"))
1548 .await
1549 .unwrap();
1550 connection_str = format!("{connection_str}, PASSWORD SECRET s");
1551 }
1552 mz_client
1553 .batch_execute(&format!(
1554 "CREATE CONNECTION pgconn TO POSTGRES ({connection_str})"
1555 ))
1556 .await
1557 .unwrap();
1558 mz_client
1559 .batch_execute(&format!(
1560 "CREATE SOURCE {source_name}
1561 FROM POSTGRES
1562 CONNECTION pgconn
1563 (PUBLICATION '{source_name}')"
1564 ))
1565 .await
1566 .unwrap();
1567 mz_client
1568 .batch_execute(&format!(
1569 "CREATE TABLE {table_name}
1570 FROM SOURCE {source_name}
1571 (REFERENCE {table_name});"
1572 ))
1573 .await
1574 .unwrap();
1575
1576 let table_name = table_name.to_string();
1577 let source_name = source_name.to_string();
1578 (
1579 pg_client,
1580 move |mz_client: &'a Client, pg_client: &'a Client| {
1581 let f: Pin<Box<dyn Future<Output = ()> + 'a>> = Box::pin(async move {
1582 mz_client
1583 .batch_execute(&format!("DROP SOURCE {source_name} CASCADE;"))
1584 .await
1585 .unwrap();
1586 mz_client
1587 .batch_execute("DROP CONNECTION pgconn;")
1588 .await
1589 .unwrap();
1590
1591 let _ = pg_client
1592 .execute(&format!("DROP PUBLICATION {source_name};"), &[])
1593 .await
1594 .unwrap();
1595 let _ = pg_client
1596 .execute(&format!("DROP TABLE {table_name};"), &[])
1597 .await
1598 .unwrap();
1599 });
1600 f
1601 },
1602 )
1603}
1604
1605pub async fn wait_for_pg_table_population(mz_client: &Client, view_name: &str, source_rows: i64) {
1606 let current_isolation = mz_client
1607 .query_one("SHOW transaction_isolation", &[])
1608 .await
1609 .unwrap()
1610 .get::<_, String>(0);
1611 mz_client
1612 .batch_execute("SET transaction_isolation = SERIALIZABLE")
1613 .await
1614 .unwrap();
1615 Retry::default()
1616 .retry_async(|_| async move {
1617 let rows = mz_client
1618 .query_one(&format!("SELECT COUNT(*) FROM {view_name};"), &[])
1619 .await
1620 .unwrap()
1621 .get::<_, i64>(0);
1622 if rows == source_rows {
1623 Ok(())
1624 } else {
1625 Err(format!(
1626 "Waiting for {source_rows} row to be ingested. Currently at {rows}."
1627 ))
1628 }
1629 })
1630 .await
1631 .unwrap();
1632 mz_client
1633 .batch_execute(&format!(
1634 "SET transaction_isolation = '{current_isolation}'"
1635 ))
1636 .await
1637 .unwrap();
1638}
1639
1640pub fn auth_with_ws(
1642 ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
1643 mut options: BTreeMap<String, String>,
1644) -> Result<Vec<WebSocketResponse>, anyhow::Error> {
1645 if !options.contains_key("welcome_message") {
1646 options.insert("welcome_message".into(), "off".into());
1647 }
1648 auth_with_ws_impl(
1649 ws,
1650 Message::Text(
1651 serde_json::to_string(&WebSocketAuth::Basic {
1652 user: "materialize".into(),
1653 password: "".into(),
1654 options,
1655 })
1656 .unwrap()
1657 .into(),
1658 ),
1659 )
1660}
1661
1662pub fn auth_with_ws_impl(
1663 ws: &mut WebSocket<MaybeTlsStream<TcpStream>>,
1664 auth_message: Message,
1665) -> Result<Vec<WebSocketResponse>, anyhow::Error> {
1666 ws.send(auth_message)?;
1667
1668 let mut msgs = Vec::new();
1670 loop {
1671 let resp = ws.read()?;
1672 match resp {
1673 Message::Text(msg) => {
1674 let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
1675 match msg {
1676 WebSocketResponse::ReadyForQuery(_) => break,
1677 msg => {
1678 msgs.push(msg);
1679 }
1680 }
1681 }
1682 Message::Ping(_) => continue,
1683 Message::Close(None) => return Err(anyhow!("ws closed after auth")),
1684 Message::Close(Some(close_frame)) => {
1685 return Err(anyhow!("ws closed after auth").context(close_frame));
1686 }
1687 _ => panic!("unexpected response: {:?}", resp),
1688 }
1689 }
1690 Ok(msgs)
1691}
1692
1693pub fn make_header<H: Header>(h: H) -> HeaderMap {
1694 let mut map = HeaderMap::new();
1695 map.typed_insert(h);
1696 map
1697}
1698
1699pub fn make_pg_tls<F>(configure: F) -> MakeTlsConnector
1700where
1701 F: FnOnce(&mut SslConnectorBuilder) -> Result<(), ErrorStack>,
1702{
1703 let mut connector_builder = SslConnector::builder(SslMethod::tls()).unwrap();
1704 let options = connector_builder.options() | SslOptions::NO_TLSV1_3;
1725 connector_builder.set_options(options);
1726 configure(&mut connector_builder).unwrap();
1727 MakeTlsConnector::new(connector_builder.build())
1728}
1729
1730pub struct Ca {
1732 pub dir: TempDir,
1733 pub name: X509Name,
1734 pub cert: X509,
1735 pub pkey: PKey<Private>,
1736}
1737
1738impl Ca {
1739 fn make_ca(name: &str, parent: Option<&Ca>) -> Result<Ca, Box<dyn Error>> {
1740 let dir = tempfile::tempdir()?;
1741 let rsa = Rsa::generate(2048)?;
1742 let pkey = PKey::from_rsa(rsa)?;
1743 let name = {
1744 let mut builder = X509NameBuilder::new()?;
1745 builder.append_entry_by_nid(Nid::COMMONNAME, name)?;
1746 builder.build()
1747 };
1748 let cert = {
1749 let mut builder = X509::builder()?;
1750 builder.set_version(2)?;
1751 builder.set_pubkey(&pkey)?;
1752 builder.set_issuer_name(parent.map(|ca| &ca.name).unwrap_or(&name))?;
1753 builder.set_subject_name(&name)?;
1754 builder.set_not_before(&*Asn1Time::days_from_now(0)?)?;
1755 builder.set_not_after(&*Asn1Time::days_from_now(365)?)?;
1756 builder.append_extension(BasicConstraints::new().critical().ca().build()?)?;
1757 builder.sign(
1758 parent.map(|ca| &ca.pkey).unwrap_or(&pkey),
1759 MessageDigest::sha256(),
1760 )?;
1761 builder.build()
1762 };
1763 fs::write(dir.path().join("ca.crt"), cert.to_pem()?)?;
1764 Ok(Ca {
1765 dir,
1766 name,
1767 cert,
1768 pkey,
1769 })
1770 }
1771
1772 pub fn new_root(name: &str) -> Result<Ca, Box<dyn Error>> {
1774 Ca::make_ca(name, None)
1775 }
1776
1777 pub fn ca_cert_path(&self) -> PathBuf {
1779 self.dir.path().join("ca.crt")
1780 }
1781
1782 pub fn request_ca(&self, name: &str) -> Result<Ca, Box<dyn Error>> {
1784 Ca::make_ca(name, Some(self))
1785 }
1786
1787 pub fn request_client_cert(&self, name: &str) -> Result<(PathBuf, PathBuf), Box<dyn Error>> {
1792 self.request_cert(name, iter::empty())
1793 }
1794
1795 pub fn request_cert<I>(&self, name: &str, ips: I) -> Result<(PathBuf, PathBuf), Box<dyn Error>>
1798 where
1799 I: IntoIterator<Item = IpAddr>,
1800 {
1801 let rsa = Rsa::generate(2048)?;
1802 let pkey = PKey::from_rsa(rsa)?;
1803 let subject_name = {
1804 let mut builder = X509NameBuilder::new()?;
1805 builder.append_entry_by_nid(Nid::COMMONNAME, name)?;
1806 builder.build()
1807 };
1808 let cert = {
1809 let mut builder = X509::builder()?;
1810 builder.set_version(2)?;
1811 builder.set_pubkey(&pkey)?;
1812 builder.set_issuer_name(self.cert.subject_name())?;
1813 builder.set_subject_name(&subject_name)?;
1814 builder.set_not_before(&*Asn1Time::days_from_now(0)?)?;
1815 builder.set_not_after(&*Asn1Time::days_from_now(365)?)?;
1816 for ip in ips {
1817 builder.append_extension(
1818 SubjectAlternativeName::new()
1819 .ip(&ip.to_string())
1820 .build(&builder.x509v3_context(None, None))?,
1821 )?;
1822 }
1823 builder.sign(&self.pkey, MessageDigest::sha256())?;
1824 builder.build()
1825 };
1826 let cert_path = self.dir.path().join(Path::new(name).with_extension("crt"));
1827 let key_path = self.dir.path().join(Path::new(name).with_extension("key"));
1828 fs::write(&cert_path, cert.to_pem()?)?;
1829 fs::write(&key_path, pkey.private_key_to_pem_pkcs8()?)?;
1830 Ok((cert_path, key_path))
1831 }
1832}