1use std::sync::{Arc, Mutex};
13
14use mz_cluster::client::{ClusterClient, ClusterSpec};
15use mz_cluster_client::client::TimelyConfig;
16use mz_ore::metrics::MetricsRegistry;
17use mz_ore::now::NowFn;
18use mz_ore::tracing::TracingHandle;
19use mz_persist_client::cache::PersistClientCache;
20use mz_rocksdb::config::SharedWriteBufferManager;
21use mz_storage_client::client::{StorageClient, StorageCommand, StorageResponse};
22use mz_storage_types::connections::ConnectionContext;
23use mz_txn_wal::operator::TxnsContext;
24use timely::communication::Allocate;
25use timely::worker::Worker as TimelyWorker;
26use tokio::sync::mpsc;
27use uuid::Uuid;
28
29use crate::metrics::StorageMetrics;
30use crate::storage_state::{StorageInstanceContext, Worker};
31
32#[derive(Clone)]
34struct Config {
35 pub persist_clients: Arc<PersistClientCache>,
37 pub txns_ctx: TxnsContext,
39 pub tracing_handle: Arc<TracingHandle>,
41 pub now: NowFn,
43 pub connection_context: ConnectionContext,
45 pub instance_context: StorageInstanceContext,
47
48 pub metrics: StorageMetrics,
50 pub shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
52}
53
54pub async fn serve(
56 timely_config: TimelyConfig,
57 metrics_registry: &MetricsRegistry,
58 persist_clients: Arc<PersistClientCache>,
59 txns_ctx: TxnsContext,
60 tracing_handle: Arc<TracingHandle>,
61 now: NowFn,
62 connection_context: ConnectionContext,
63 instance_context: StorageInstanceContext,
64) -> Result<impl Fn() -> Box<dyn StorageClient> + use<>, anyhow::Error> {
65 let config = Config {
66 persist_clients,
67 txns_ctx,
68 tracing_handle,
69 now,
70 connection_context,
71 instance_context,
72 metrics: StorageMetrics::register_with(metrics_registry),
73 shared_rocksdb_write_buffer_manager: Default::default(),
77 };
78 let tokio_executor = tokio::runtime::Handle::current();
79
80 let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
81 let timely_container = Arc::new(Mutex::new(timely_container));
82
83 let client_builder = move || {
84 let client = ClusterClient::new(Arc::clone(&timely_container));
85 let client: Box<dyn StorageClient> = Box::new(client);
86 client
87 };
88
89 Ok(client_builder)
90}
91
92impl ClusterSpec for Config {
93 type Command = StorageCommand;
94 type Response = StorageResponse;
95
96 const NAME: &str = "storage";
97
98 fn run_worker<A: Allocate + 'static>(
99 &self,
100 timely_worker: &mut TimelyWorker<A>,
101 client_rx: mpsc::UnboundedReceiver<(
102 Uuid,
103 mpsc::UnboundedReceiver<StorageCommand>,
104 mpsc::UnboundedSender<StorageResponse>,
105 )>,
106 ) {
107 Worker::new(
108 timely_worker,
109 client_rx,
110 self.metrics.clone(),
111 self.now.clone(),
112 self.connection_context.clone(),
113 self.instance_context.clone(),
114 Arc::clone(&self.persist_clients),
115 self.txns_ctx.clone(),
116 Arc::clone(&self.tracing_handle),
117 self.shared_rocksdb_write_buffer_manager.clone(),
118 )
119 .run();
120 }
121}