Skip to main content

mz_storage/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::sync::{Arc, Mutex};
13
14use mz_cluster::client::{ClusterClient, ClusterSpec};
15use mz_cluster_client::client::TimelyConfig;
16use mz_ore::metrics::MetricsRegistry;
17use mz_ore::now::NowFn;
18use mz_ore::tracing::TracingHandle;
19use mz_persist_client::cache::PersistClientCache;
20use mz_rocksdb::config::SharedWriteBufferManager;
21use mz_storage_client::client::{StorageClient, StorageCommand, StorageResponse};
22use mz_storage_types::connections::ConnectionContext;
23use mz_txn_wal::operator::TxnsContext;
24use timely::worker::Worker as TimelyWorker;
25use tokio::sync::mpsc;
26use uuid::Uuid;
27
28use crate::metrics::StorageMetrics;
29use crate::storage_state::{StorageInstanceContext, Worker};
30
31/// Configures a dataflow server.
32#[derive(Clone)]
33struct Config {
34    /// `persist` client cache.
35    pub persist_clients: Arc<PersistClientCache>,
36    /// Context necessary for rendering txn-wal operators.
37    pub txns_ctx: TxnsContext,
38    /// A process-global handle to tracing configuration.
39    pub tracing_handle: Arc<TracingHandle>,
40    /// Function to get wall time now.
41    pub now: NowFn,
42    /// Configuration for source and sink connection.
43    pub connection_context: ConnectionContext,
44    /// Other configuration for storage instances.
45    pub instance_context: StorageInstanceContext,
46
47    /// Metrics for storage
48    pub metrics: StorageMetrics,
49    /// Shared rocksdb write buffer manager
50    pub shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
51}
52
53/// Initiates a timely dataflow computation, processing storage commands.
54pub async fn serve(
55    timely_config: TimelyConfig,
56    metrics_registry: &MetricsRegistry,
57    persist_clients: Arc<PersistClientCache>,
58    txns_ctx: TxnsContext,
59    tracing_handle: Arc<TracingHandle>,
60    now: NowFn,
61    connection_context: ConnectionContext,
62    instance_context: StorageInstanceContext,
63) -> Result<impl Fn() -> Box<dyn StorageClient> + use<>, anyhow::Error> {
64    let config = Config {
65        persist_clients,
66        txns_ctx,
67        tracing_handle,
68        now,
69        connection_context,
70        instance_context,
71        metrics: StorageMetrics::register_with(metrics_registry),
72        // The shared RocksDB `WriteBufferManager` is shared between the workers.
73        // It protects (behind a shared mutex) a `Weak` that will be upgraded and shared when the
74        // first worker attempts to initialize it.
75        shared_rocksdb_write_buffer_manager: Default::default(),
76    };
77    let tokio_executor = tokio::runtime::Handle::current();
78
79    let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
80    let timely_container = Arc::new(Mutex::new(timely_container));
81
82    let client_builder = move || {
83        let client = ClusterClient::new(Arc::clone(&timely_container));
84        let client: Box<dyn StorageClient> = Box::new(client);
85        client
86    };
87
88    Ok(client_builder)
89}
90
91impl ClusterSpec for Config {
92    type Command = StorageCommand;
93    type Response = StorageResponse;
94
95    const NAME: &str = "storage";
96
97    fn run_worker(
98        &self,
99        timely_worker: &mut TimelyWorker,
100        client_rx: mpsc::UnboundedReceiver<(
101            Uuid,
102            mpsc::UnboundedReceiver<StorageCommand>,
103            mpsc::UnboundedSender<StorageResponse>,
104        )>,
105    ) {
106        Worker::new(
107            timely_worker,
108            client_rx,
109            self.metrics.clone(),
110            self.now.clone(),
111            self.connection_context.clone(),
112            self.instance_context.clone(),
113            Arc::clone(&self.persist_clients),
114            self.txns_ctx.clone(),
115            Arc::clone(&self.tracing_handle),
116            self.shared_rocksdb_write_buffer_manager.clone(),
117        )
118        .run();
119    }
120}