mz_storage/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::sync::{Arc, Mutex};
13
14use mz_cluster::client::{ClusterClient, ClusterSpec};
15use mz_cluster_client::client::TimelyConfig;
16use mz_ore::metrics::MetricsRegistry;
17use mz_ore::now::NowFn;
18use mz_ore::tracing::TracingHandle;
19use mz_persist_client::cache::PersistClientCache;
20use mz_rocksdb::config::SharedWriteBufferManager;
21use mz_storage_client::client::{StorageClient, StorageCommand, StorageResponse};
22use mz_storage_types::connections::ConnectionContext;
23use mz_txn_wal::operator::TxnsContext;
24use timely::communication::Allocate;
25use timely::worker::Worker as TimelyWorker;
26use tokio::sync::mpsc;
27use uuid::Uuid;
28
29use crate::metrics::StorageMetrics;
30use crate::storage_state::{StorageInstanceContext, Worker};
31
32/// Configures a dataflow server.
33#[derive(Clone)]
34struct Config {
35    /// `persist` client cache.
36    pub persist_clients: Arc<PersistClientCache>,
37    /// Context necessary for rendering txn-wal operators.
38    pub txns_ctx: TxnsContext,
39    /// A process-global handle to tracing configuration.
40    pub tracing_handle: Arc<TracingHandle>,
41    /// Function to get wall time now.
42    pub now: NowFn,
43    /// Configuration for source and sink connection.
44    pub connection_context: ConnectionContext,
45    /// Other configuration for storage instances.
46    pub instance_context: StorageInstanceContext,
47
48    /// Metrics for storage
49    pub metrics: StorageMetrics,
50    /// Shared rocksdb write buffer manager
51    pub shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
52}
53
54/// Initiates a timely dataflow computation, processing storage commands.
55pub async fn serve(
56    timely_config: TimelyConfig,
57    metrics_registry: &MetricsRegistry,
58    persist_clients: Arc<PersistClientCache>,
59    txns_ctx: TxnsContext,
60    tracing_handle: Arc<TracingHandle>,
61    now: NowFn,
62    connection_context: ConnectionContext,
63    instance_context: StorageInstanceContext,
64) -> Result<impl Fn() -> Box<dyn StorageClient> + use<>, anyhow::Error> {
65    let config = Config {
66        persist_clients,
67        txns_ctx,
68        tracing_handle,
69        now,
70        connection_context,
71        instance_context,
72        metrics: StorageMetrics::register_with(metrics_registry),
73        // The shared RocksDB `WriteBufferManager` is shared between the workers.
74        // It protects (behind a shared mutex) a `Weak` that will be upgraded and shared when the
75        // first worker attempts to initialize it.
76        shared_rocksdb_write_buffer_manager: Default::default(),
77    };
78    let tokio_executor = tokio::runtime::Handle::current();
79
80    let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
81    let timely_container = Arc::new(Mutex::new(timely_container));
82
83    let client_builder = move || {
84        let client = ClusterClient::new(Arc::clone(&timely_container));
85        let client: Box<dyn StorageClient> = Box::new(client);
86        client
87    };
88
89    Ok(client_builder)
90}
91
92impl ClusterSpec for Config {
93    type Command = StorageCommand;
94    type Response = StorageResponse;
95
96    fn run_worker<A: Allocate + 'static>(
97        &self,
98        timely_worker: &mut TimelyWorker<A>,
99        client_rx: crossbeam_channel::Receiver<(
100            Uuid,
101            crossbeam_channel::Receiver<StorageCommand>,
102            mpsc::UnboundedSender<StorageResponse>,
103        )>,
104    ) {
105        Worker::new(
106            timely_worker,
107            client_rx,
108            self.metrics.clone(),
109            self.now.clone(),
110            self.connection_context.clone(),
111            self.instance_context.clone(),
112            Arc::clone(&self.persist_clients),
113            self.txns_ctx.clone(),
114            Arc::clone(&self.tracing_handle),
115            self.shared_rocksdb_write_buffer_manager.clone(),
116        )
117        .run();
118    }
119}