1use std::sync::{Arc, Mutex};
13
14use mz_cluster::client::{ClusterClient, ClusterSpec};
15use mz_cluster_client::client::TimelyConfig;
16use mz_ore::metrics::MetricsRegistry;
17use mz_ore::now::NowFn;
18use mz_ore::tracing::TracingHandle;
19use mz_persist_client::cache::PersistClientCache;
20use mz_rocksdb::config::SharedWriteBufferManager;
21use mz_storage_client::client::{StorageClient, StorageCommand, StorageResponse};
22use mz_storage_types::connections::ConnectionContext;
23use mz_txn_wal::operator::TxnsContext;
24use timely::worker::Worker as TimelyWorker;
25use tokio::sync::mpsc;
26use uuid::Uuid;
27
28use crate::metrics::StorageMetrics;
29use crate::storage_state::{StorageInstanceContext, Worker};
30
31#[derive(Clone)]
33struct Config {
34 pub persist_clients: Arc<PersistClientCache>,
36 pub txns_ctx: TxnsContext,
38 pub tracing_handle: Arc<TracingHandle>,
40 pub now: NowFn,
42 pub connection_context: ConnectionContext,
44 pub instance_context: StorageInstanceContext,
46
47 pub metrics: StorageMetrics,
49 pub shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
51}
52
53pub async fn serve(
55 timely_config: TimelyConfig,
56 metrics_registry: &MetricsRegistry,
57 persist_clients: Arc<PersistClientCache>,
58 txns_ctx: TxnsContext,
59 tracing_handle: Arc<TracingHandle>,
60 now: NowFn,
61 connection_context: ConnectionContext,
62 instance_context: StorageInstanceContext,
63) -> Result<impl Fn() -> Box<dyn StorageClient> + use<>, anyhow::Error> {
64 let config = Config {
65 persist_clients,
66 txns_ctx,
67 tracing_handle,
68 now,
69 connection_context,
70 instance_context,
71 metrics: StorageMetrics::register_with(metrics_registry),
72 shared_rocksdb_write_buffer_manager: Default::default(),
76 };
77 let tokio_executor = tokio::runtime::Handle::current();
78
79 let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
80 let timely_container = Arc::new(Mutex::new(timely_container));
81
82 let client_builder = move || {
83 let client = ClusterClient::new(Arc::clone(&timely_container));
84 let client: Box<dyn StorageClient> = Box::new(client);
85 client
86 };
87
88 Ok(client_builder)
89}
90
91impl ClusterSpec for Config {
92 type Command = StorageCommand;
93 type Response = StorageResponse;
94
95 const NAME: &str = "storage";
96
97 fn run_worker(
98 &self,
99 timely_worker: &mut TimelyWorker,
100 client_rx: mpsc::UnboundedReceiver<(
101 Uuid,
102 mpsc::UnboundedReceiver<StorageCommand>,
103 mpsc::UnboundedSender<StorageResponse>,
104 )>,
105 ) {
106 Worker::new(
107 timely_worker,
108 client_rx,
109 self.metrics.clone(),
110 self.now.clone(),
111 self.connection_context.clone(),
112 self.instance_context.clone(),
113 Arc::clone(&self.persist_clients),
114 self.txns_ctx.clone(),
115 Arc::clone(&self.tracing_handle),
116 self.shared_rocksdb_write_buffer_manager.clone(),
117 )
118 .run();
119 }
120}