mz_storage/internal_control.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Types for cluster-internal control messages that can be broadcast to all
7//! workers from individual operators/workers.
8
9use std::cell::RefCell;
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::sync::mpsc;
13
14use mz_repr::{GlobalId, Row};
15use mz_rocksdb::config::SharedWriteBufferManager;
16use mz_storage_types::controller::CollectionMetadata;
17use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
18use mz_storage_types::parameters::StorageParameters;
19use mz_storage_types::sinks::StorageSinkDesc;
20use mz_storage_types::sources::IngestionDescription;
21use serde::{Deserialize, Serialize};
22use timely::communication::Allocate;
23use timely::dataflow::channels::pact::{Exchange, Pipeline};
24use timely::dataflow::operators::generic::{OutputHandle, source};
25use timely::dataflow::operators::{Broadcast, Operator};
26use timely::progress::Antichain;
27use timely::scheduling::{Activator, Scheduler};
28use timely::worker::Worker as TimelyWorker;
29
30use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};
31
32/// _Dynamic_ storage instance configuration parameters that are used during dataflow rendering.
33/// Changes to these parameters are applied to `StorageWorker`s in a consistent order
34/// with source and sink creation.
35#[derive(Debug)]
36pub struct DataflowParameters {
37    /// Configuration/tuning for RocksDB. This also contains
38    /// some shared objects, which is why its separate.
39    pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig,
40}
41
42impl DataflowParameters {
43    /// Creates a new instance of `DataflowParameters` with given shared rocksdb write buffer manager
44    /// and the cluster memory limit
45    pub fn new(
46        shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
47        cluster_memory_limit: Option<usize>,
48    ) -> Self {
49        Self {
50            upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig::new(
51                shared_rocksdb_write_buffer_manager,
52                cluster_memory_limit,
53            ),
54        }
55    }
56    /// Update the `DataflowParameters` with new configuration.
57    pub fn update(&mut self, storage_parameters: StorageParameters) {
58        self.upsert_rocksdb_tuning_config
59            .apply(storage_parameters.upsert_rocksdb_tuning_config.clone());
60    }
61}
62
63/// Internal commands that can be sent by individual operators/workers that will
64/// be broadcast to all workers. The worker main loop will receive those and act
65/// on them.
66#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
67pub enum InternalStorageCommand {
68    /// Suspend and restart the dataflow identified by the `GlobalId`.
69    SuspendAndRestart {
70        /// The id of the dataflow that should be restarted.
71        id: GlobalId,
72        /// The reason for the restart request.
73        reason: String,
74    },
75    /// Render an ingestion dataflow at the given resumption frontier.
76    CreateIngestionDataflow {
77        /// ID of the ingestion/sourve.
78        id: GlobalId,
79        /// The description of the ingestion/source.
80        ingestion_description: IngestionDescription<CollectionMetadata>,
81        /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
82        /// ingestion are guaranteed to be readable at this frontier.
83        as_of: Antichain<mz_repr::Timestamp>,
84        /// A frontier in the Materialize time domain with the property that all updates not beyond
85        /// it have already been durably ingested.
86        resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
87        /// A frontier in the source time domain with the property that all updates not beyond it
88        /// have already been durably ingested.
89        source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
90    },
91    /// Render a oneshot ingestion dataflow that fetches data from an external system and stages
92    /// batches in Persist, that can later be appended to the shard.
93    RunOneshotIngestion {
94        /// ID of the running dataflow that is doing the ingestion.
95        ingestion_id: uuid::Uuid,
96        /// ID of the collection we'll create batches for.
97        collection_id: GlobalId,
98        /// Metadata of the collection we'll create batches for.
99        collection_meta: CollectionMetadata,
100        /// Description of the oneshot ingestion.
101        request: OneshotIngestionRequest,
102    },
103    /// Render a sink dataflow.
104    RunSinkDataflow(
105        GlobalId,
106        StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
107    ),
108    /// Drop all state and operators for a dataflow. This is a vec because some
109    /// dataflows have their state spread over multiple IDs (i.e. sources that
110    /// spawn subsources); this means that actions taken in response to this
111    /// command should be permissive about missing state.
112    DropDataflow(Vec<GlobalId>),
113
114    /// Update the configuration for rendering dataflows.
115    UpdateConfiguration {
116        /// The new configuration parameters.
117        storage_parameters: StorageParameters,
118    },
119    /// For moving statistics updates to worker 0.
120    StatisticsUpdate {
121        /// Local statistics, with their epochs.
122        sources: Vec<(usize, SourceStatisticsRecord)>,
123        /// Local statistics, with their epochs.
124        sinks: Vec<(usize, SinkStatisticsRecord)>,
125    },
126}
127
128/// A sender broadcasting [`InternalStorageCommand`]s to all workers.
129#[derive(Clone)]
130pub struct InternalCommandSender {
131    tx: mpsc::Sender<InternalStorageCommand>,
132    activator: Rc<RefCell<Option<Activator>>>,
133}
134
135impl InternalCommandSender {
136    /// Broadcasts the given command to all workers.
137    pub fn send(&self, cmd: InternalStorageCommand) {
138        if self.tx.send(cmd).is_err() {
139            panic!("internal command channel disconnected");
140        }
141
142        self.activator.borrow().as_ref().map(|a| a.activate());
143    }
144}
145
146/// A receiver for [`InternalStorageCommand`]s broadcasted by workers.
147pub struct InternalCommandReceiver {
148    rx: mpsc::Receiver<InternalStorageCommand>,
149}
150
151impl InternalCommandReceiver {
152    /// Returns the next available command, if any.
153    ///
154    /// This returns `None` when there are currently no commands but there might be commands again
155    /// in the future.
156    pub fn try_recv(&self) -> Option<InternalStorageCommand> {
157        match self.rx.try_recv() {
158            Ok(cmd) => Some(cmd),
159            Err(mpsc::TryRecvError::Empty) => None,
160            Err(mpsc::TryRecvError::Disconnected) => {
161                panic!("internal command channel disconnected")
162            }
163        }
164    }
165}
166
167pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
168    timely_worker: &'w mut TimelyWorker<A>,
169) -> (InternalCommandSender, InternalCommandReceiver) {
170    let (input_tx, input_rx) = mpsc::channel();
171    let (output_tx, output_rx) = mpsc::channel();
172    let activator = Rc::new(RefCell::new(None));
173
174    timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
175        let activator = Rc::clone(&activator);
176        move |scope| {
177            // Create a stream of commands received from `input_rx`.
178            //
179            // The output commands are tagged by worker ID and command index, allowing downstream
180            // operators to ensure their correct relative order.
181            let stream = source(scope, "command_sequencer::source", |cap, info| {
182                *activator.borrow_mut() = Some(scope.activator_for(info.address));
183
184                let worker_id = scope.index();
185                let mut cmd_index = 0;
186                let mut capability = Some(cap);
187
188                move |output: &mut OutputHandle<_, _, _>| {
189                    let Some(cap) = &capability else {
190                        return;
191                    };
192
193                    let mut session = output.session(cap);
194                    loop {
195                        match input_rx.try_recv() {
196                            Ok(command) => {
197                                let cmd = IndexedCommand {
198                                    index: cmd_index,
199                                    command,
200                                };
201                                session.give((worker_id, cmd));
202                                cmd_index += 1;
203                            }
204                            Err(mpsc::TryRecvError::Empty) => break,
205                            Err(mpsc::TryRecvError::Disconnected) => {
206                                // Drop our capability to shut down.
207                                capability = None;
208                                break;
209                            }
210                        }
211                    }
212                }
213            });
214
215            // Sequence all commands through a single worker to establish a unique order.
216            //
217            // The output commands are tagged by a command index, allowing downstream operators to
218            // ensure their correct relative order.
219            let stream = stream.unary_frontier(
220                Exchange::new(|_| 0),
221                "command_sequencer::sequencer",
222                |cap, _info| {
223                    let mut cmd_index = 0;
224                    let mut capability = Some(cap);
225
226                    // For each worker, keep an ordered list of pending commands, as well as the
227                    // current index of the next command.
228                    let mut pending_commands = vec![(BTreeSet::new(), 0); scope.peers()];
229
230                    move |input, output: &mut OutputHandle<_, _, _>| {
231                        let Some(cap) = &capability else {
232                            return;
233                        };
234
235                        while let Some((_cap, data)) = input.next() {
236                            for (worker_id, cmd) in data.drain(..) {
237                                pending_commands[worker_id].0.insert(cmd);
238                            }
239                        }
240
241                        let mut session = output.session(cap);
242                        for (commands, next_idx) in &mut pending_commands {
243                            while commands.first().is_some_and(|c| c.index == *next_idx) {
244                                let mut cmd = commands.pop_first().unwrap();
245                                cmd.index = cmd_index;
246                                session.give(cmd);
247
248                                *next_idx += 1;
249                                cmd_index += 1;
250                            }
251                        }
252
253                        if input.frontier().is_empty() {
254                            // Drop our capability to shut down.
255                            capability = None;
256                        }
257                    }
258                },
259            );
260
261            // Broadcast the ordered commands to all workers.
262            let stream = stream.broadcast();
263
264            // Sink the stream back into `output_tx`.
265            stream.sink(Pipeline, "command_sequencer::sink", {
266                // Keep an ordered list of pending commands, as well as the current index of the
267                // next command.
268                let mut pending_commands = BTreeSet::new();
269                let mut next_idx = 0;
270
271                move |input| {
272                    while let Some((_cap, data)) = input.next() {
273                        pending_commands.extend(data.drain(..));
274                    }
275
276                    while pending_commands
277                        .first()
278                        .is_some_and(|c| c.index == next_idx)
279                    {
280                        let cmd = pending_commands.pop_first().unwrap();
281                        let _ = output_tx.send(cmd.command);
282                        next_idx += 1;
283                    }
284                }
285            });
286        }
287    });
288
289    let tx = InternalCommandSender {
290        tx: input_tx,
291        activator,
292    };
293    let rx = InternalCommandReceiver { rx: output_rx };
294
295    (tx, rx)
296}
297
298// An [`InternalStorageCommand`] tagged with an index.
299//
300// This is a `(u64, InternalStorageCommand)` in spirit, but implements `Ord` (which
301// `InternalStorageCommand` doesn't) by looking only at the index.
302#[derive(Clone, Debug, Serialize, Deserialize)]
303struct IndexedCommand {
304    index: u64,
305    command: InternalStorageCommand,
306}
307
308impl PartialEq for IndexedCommand {
309    fn eq(&self, other: &Self) -> bool {
310        self.cmp(other).is_eq()
311    }
312}
313
314impl Eq for IndexedCommand {}
315
316impl PartialOrd for IndexedCommand {
317    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
318        Some(self.cmp(other))
319    }
320}
321
322impl Ord for IndexedCommand {
323    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
324        self.index.cmp(&other.index)
325    }
326}