1use std::sync::{Arc, Mutex};
13
14use mz_cluster::client::{ClusterClient, ClusterSpec};
15use mz_cluster_client::client::TimelyConfig;
16use mz_ore::metrics::MetricsRegistry;
17use mz_ore::now::NowFn;
18use mz_ore::tracing::TracingHandle;
19use mz_persist_client::cache::PersistClientCache;
20use mz_rocksdb::config::SharedWriteBufferManager;
21use mz_storage_client::client::{StorageClient, StorageCommand, StorageResponse};
22use mz_storage_types::connections::ConnectionContext;
23use mz_txn_wal::operator::TxnsContext;
24use timely::communication::Allocate;
25use timely::worker::Worker as TimelyWorker;
26use tokio::sync::mpsc;
27use uuid::Uuid;
28
29use crate::metrics::StorageMetrics;
30use crate::storage_state::{StorageInstanceContext, Worker};
31
32#[derive(Clone)]
34struct Config {
35 pub persist_clients: Arc<PersistClientCache>,
37 pub txns_ctx: TxnsContext,
39 pub tracing_handle: Arc<TracingHandle>,
41 pub now: NowFn,
43 pub connection_context: ConnectionContext,
45 pub instance_context: StorageInstanceContext,
47
48 pub metrics: StorageMetrics,
50 pub shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
52}
53
54pub async fn serve(
56 timely_config: TimelyConfig,
57 metrics_registry: &MetricsRegistry,
58 persist_clients: Arc<PersistClientCache>,
59 txns_ctx: TxnsContext,
60 tracing_handle: Arc<TracingHandle>,
61 now: NowFn,
62 connection_context: ConnectionContext,
63 instance_context: StorageInstanceContext,
64) -> Result<impl Fn() -> Box<dyn StorageClient> + use<>, anyhow::Error> {
65 let config = Config {
66 persist_clients,
67 txns_ctx,
68 tracing_handle,
69 now,
70 connection_context,
71 instance_context,
72 metrics: StorageMetrics::register_with(metrics_registry),
73 shared_rocksdb_write_buffer_manager: Default::default(),
77 };
78 let tokio_executor = tokio::runtime::Handle::current();
79
80 let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
81 let timely_container = Arc::new(Mutex::new(timely_container));
82
83 let client_builder = move || {
84 let client = ClusterClient::new(Arc::clone(&timely_container));
85 let client: Box<dyn StorageClient> = Box::new(client);
86 client
87 };
88
89 Ok(client_builder)
90}
91
92impl ClusterSpec for Config {
93 type Command = StorageCommand;
94 type Response = StorageResponse;
95
96 fn run_worker<A: Allocate + 'static>(
97 &self,
98 timely_worker: &mut TimelyWorker<A>,
99 client_rx: crossbeam_channel::Receiver<(
100 Uuid,
101 crossbeam_channel::Receiver<StorageCommand>,
102 mpsc::UnboundedSender<StorageResponse>,
103 )>,
104 ) {
105 Worker::new(
106 timely_worker,
107 client_rx,
108 self.metrics.clone(),
109 self.now.clone(),
110 self.connection_context.clone(),
111 self.instance_context.clone(),
112 Arc::clone(&self.persist_clients),
113 self.txns_ctx.clone(),
114 Arc::clone(&self.tracing_handle),
115 self.shared_rocksdb_write_buffer_manager.clone(),
116 )
117 .run();
118 }
119}