Skip to main content

mz_storage/
server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An interactive dataflow server.
11
12use std::sync::{Arc, Mutex};
13use std::time::Duration;
14
15use mz_cluster::client::{ClusterClient, ClusterSpec};
16use mz_cluster_client::client::TimelyConfig;
17use mz_ore::metrics::MetricsRegistry;
18use mz_ore::now::NowFn;
19use mz_ore::tracing::TracingHandle;
20use mz_persist_client::cache::PersistClientCache;
21use mz_rocksdb::config::SharedWriteBufferManager;
22use mz_storage_client::client::{StorageClient, StorageCommand, StorageResponse};
23use mz_storage_types::connections::ConnectionContext;
24use mz_timely_util::capture::EventLink;
25use mz_txn_wal::operator::TxnsContext;
26use timely::logging::{
27    ChannelsEvent, MessagesEvent, OperatesEvent, ScheduleEvent, ShutdownEvent, TimelyEvent,
28};
29use timely::worker::Worker as TimelyWorker;
30use tokio::sync::mpsc;
31use uuid::Uuid;
32
33use crate::metrics::StorageMetrics;
34use crate::storage_state::{StorageInstanceContext, Worker};
35
36/// Configures a dataflow server.
37#[derive(Clone)]
38struct Config {
39    /// `persist` client cache.
40    pub persist_clients: Arc<PersistClientCache>,
41    /// Context necessary for rendering txn-wal operators.
42    pub txns_ctx: TxnsContext,
43    /// A process-global handle to tracing configuration.
44    pub tracing_handle: Arc<TracingHandle>,
45    /// Function to get wall time now.
46    pub now: NowFn,
47    /// Configuration for source and sink connection.
48    pub connection_context: ConnectionContext,
49    /// Other configuration for storage instances.
50    pub instance_context: StorageInstanceContext,
51
52    /// Metrics for storage
53    pub metrics: StorageMetrics,
54    /// Shared rocksdb write buffer manager
55    pub shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
56    /// Number of timely workers in this process, for local-index computation.
57    pub workers_per_process: usize,
58    /// Per-worker writers for forwarding timely logging events to compute,
59    /// indexed by local worker index.
60    pub timely_log_writers: Arc<Mutex<Vec<Option<TimelyLogWriter>>>>,
61}
62
63/// Per-worker writer handle for forwarding timely logging events to compute.
64pub(crate) type TimelyLogWriter = Arc<EventLink<mz_repr::Timestamp, Vec<(Duration, TimelyEvent)>>>;
65
66/// Initiates a timely dataflow computation, processing storage commands.
67pub async fn serve(
68    timely_config: TimelyConfig,
69    metrics_registry: &MetricsRegistry,
70    persist_clients: Arc<PersistClientCache>,
71    txns_ctx: TxnsContext,
72    tracing_handle: Arc<TracingHandle>,
73    now: NowFn,
74    connection_context: ConnectionContext,
75    instance_context: StorageInstanceContext,
76    timely_log_writers: Vec<TimelyLogWriter>,
77) -> Result<impl Fn() -> Box<dyn StorageClient> + use<>, anyhow::Error> {
78    let workers_per_process = timely_config.workers;
79    // Normalize the log-writer vec to exactly one slot per worker in this process.
80    // Empty input means logging is disabled; pad with `None` so index-based access is
81    // always in bounds.
82    let timely_log_writers = if timely_log_writers.is_empty() {
83        (0..workers_per_process).map(|_| None).collect()
84    } else {
85        assert_eq!(timely_log_writers.len(), workers_per_process);
86        timely_log_writers.into_iter().map(Some).collect()
87    };
88    let config = Config {
89        persist_clients,
90        txns_ctx,
91        tracing_handle,
92        now,
93        connection_context,
94        instance_context,
95        metrics: StorageMetrics::register_with(metrics_registry),
96        // The shared RocksDB `WriteBufferManager` is shared between the workers.
97        // It protects (behind a shared mutex) a `Weak` that will be upgraded and shared when the
98        // first worker attempts to initialize it.
99        shared_rocksdb_write_buffer_manager: Default::default(),
100        workers_per_process,
101        timely_log_writers: Arc::new(Mutex::new(timely_log_writers)),
102    };
103    let tokio_executor = tokio::runtime::Handle::current();
104
105    let timely_container = config.build_cluster(timely_config, tokio_executor).await?;
106    let timely_container = Arc::new(Mutex::new(timely_container));
107
108    let client_builder = move || {
109        let client = ClusterClient::new(Arc::clone(&timely_container));
110        let client: Box<dyn StorageClient> = Box::new(client);
111        client
112    };
113
114    Ok(client_builder)
115}
116
117impl ClusterSpec for Config {
118    type Command = StorageCommand;
119    type Response = StorageResponse;
120
121    const NAME: &str = "storage";
122
123    fn run_worker(
124        &self,
125        timely_worker: &mut TimelyWorker,
126        client_rx: mpsc::UnboundedReceiver<(
127            Uuid,
128            mpsc::UnboundedReceiver<StorageCommand>,
129            mpsc::UnboundedSender<StorageResponse>,
130        )>,
131    ) {
132        // Register a timely logger that forwards events to the compute logging dataflow.
133        // Assign by local worker index so storage worker x matches compute worker x.
134        let local_index = timely_worker.index() % self.workers_per_process;
135        let writer = self.timely_log_writers.lock().unwrap()[local_index].take();
136        if let Some(writer) = writer {
137            use timely::dataflow::operators::capture::{Event, EventPusher};
138            use timely::logging::TimelyEventBuilder;
139
140            // We use an approach similar to compute's logging: wrap the writer in
141            // a BatchLogger that translates Logger callbacks into Event pushes,
142            // then register the Logger with timely's log_register.
143            let interval_ms = 1000u128; // 1 second batching interval
144            let mut time_ms = mz_repr::Timestamp::from(0u64);
145            let mut event_pusher = writer;
146            let now = std::time::Instant::now();
147            let start_offset = std::time::SystemTime::now()
148                .duration_since(std::time::SystemTime::UNIX_EPOCH)
149                .expect("Failed to get duration since Unix epoch");
150
151            let logger = timely::logging_core::Logger::<TimelyEventBuilder>::new(
152                now,
153                start_offset,
154                move |time: &std::time::Duration,
155                      data: &mut Option<Vec<(std::time::Duration, TimelyEvent)>>| {
156                    if let Some(mut data) = data.take() {
157                        // Filter park/unpark events and remap IDs before handing events
158                        // off to compute. Compute's park tracking assumes a single
159                        // timely runtime; mixing in storage's park events would break
160                        // it. Remapping ensures storage operator/channel IDs don't
161                        // collide with compute's.
162                        data.retain_mut(|(_, event)| {
163                            if matches!(event, TimelyEvent::Park(_)) {
164                                return false;
165                            }
166                            remap_timely_event_ids(event);
167                            true
168                        });
169                        event_pusher.push(Event::Messages(time_ms, data));
170                    } else {
171                        // Advance progress.
172                        let new_time_ms: u64 = (((time.as_millis() / interval_ms) + 1)
173                            * interval_ms)
174                            .try_into()
175                            .expect("must fit");
176                        let new_time_ms = mz_repr::Timestamp::from(new_time_ms);
177                        if time_ms < new_time_ms {
178                            event_pusher
179                                .push(Event::Progress(vec![(new_time_ms, 1), (time_ms, -1)]));
180                            time_ms = new_time_ms;
181                        }
182                    }
183                },
184            );
185
186            if let Some(mut register) = timely_worker.log_register() {
187                register.insert_logger("timely", logger);
188            }
189        }
190
191        Worker::new(
192            timely_worker,
193            client_rx,
194            self.metrics.clone(),
195            self.now.clone(),
196            self.connection_context.clone(),
197            self.instance_context.clone(),
198            Arc::clone(&self.persist_clients),
199            self.txns_ctx.clone(),
200            Arc::clone(&self.tracing_handle),
201            self.shared_rocksdb_write_buffer_manager.clone(),
202        )
203        .run();
204    }
205}
206
207/// Offset added to storage operator/channel IDs to avoid collisions with compute IDs.
208///
209/// Large enough that compute IDs (which start from 0 and grow) will never reach it,
210/// but small enough to be representable as a `u64` with room for many storage operators.
211const STORAGE_ID_OFFSET: usize = 1 << 48;
212
213/// Remaps operator, channel, and address IDs in a `TimelyEvent` so that events
214/// forwarded to compute's logging dataflow don't collide with compute's IDs.
215fn remap_timely_event_ids(event: &mut TimelyEvent) {
216    match event {
217        TimelyEvent::Operates(OperatesEvent { id, addr, .. }) => {
218            *id = id.wrapping_add(STORAGE_ID_OFFSET);
219            if let Some(first) = addr.first_mut() {
220                *first = first.wrapping_add(STORAGE_ID_OFFSET);
221            }
222        }
223        TimelyEvent::Channels(ChannelsEvent {
224            id,
225            scope_addr,
226            source,
227            target,
228            ..
229        }) => {
230            *id = id.wrapping_add(STORAGE_ID_OFFSET);
231            if let Some(first) = scope_addr.first_mut() {
232                *first = first.wrapping_add(STORAGE_ID_OFFSET);
233            }
234            source.0 = source.0.wrapping_add(STORAGE_ID_OFFSET);
235            target.0 = target.0.wrapping_add(STORAGE_ID_OFFSET);
236        }
237        TimelyEvent::Shutdown(ShutdownEvent { id }) => {
238            *id = id.wrapping_add(STORAGE_ID_OFFSET);
239        }
240        TimelyEvent::Schedule(ScheduleEvent { id, .. }) => {
241            *id = id.wrapping_add(STORAGE_ID_OFFSET);
242        }
243        TimelyEvent::Messages(MessagesEvent { channel, .. }) => {
244            *channel = channel.wrapping_add(STORAGE_ID_OFFSET);
245            // source/target in Messages are worker IDs, not operator IDs.
246        }
247        TimelyEvent::PushProgress(e) => {
248            e.op_id = e.op_id.wrapping_add(STORAGE_ID_OFFSET);
249        }
250        TimelyEvent::CommChannels(e) => {
251            e.identifier = e.identifier.wrapping_add(STORAGE_ID_OFFSET);
252        }
253        TimelyEvent::Park(_) | TimelyEvent::Text(_) => {
254            // No IDs to remap.
255        }
256    }
257}