Skip to main content

mz_storage/
internal_control.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Types for cluster-internal control messages that can be broadcast to all
7//! workers from individual operators/workers.
8
9use std::cell::RefCell;
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::sync::mpsc;
13
14use mz_repr::{GlobalId, Row};
15use mz_rocksdb::config::SharedWriteBufferManager;
16use mz_storage_types::controller::CollectionMetadata;
17use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
18use mz_storage_types::parameters::StorageParameters;
19use mz_storage_types::sinks::StorageSinkDesc;
20use mz_storage_types::sources::IngestionDescription;
21use mz_timely_util::scope_label::ScopeExt;
22use serde::{Deserialize, Serialize};
23use timely::communication::Allocate;
24use timely::dataflow::channels::pact::{Exchange, Pipeline};
25use timely::dataflow::operators::generic::source;
26use timely::dataflow::operators::{Broadcast, Operator};
27use timely::progress::Antichain;
28use timely::scheduling::{Activator, Scheduler};
29use timely::worker::{AsWorker, Worker as TimelyWorker};
30
31use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};
32
33/// _Dynamic_ storage instance configuration parameters that are used during dataflow rendering.
34/// Changes to these parameters are applied to `StorageWorker`s in a consistent order
35/// with source and sink creation.
36#[derive(Debug)]
37pub struct DataflowParameters {
38    /// Configuration/tuning for RocksDB. This also contains
39    /// some shared objects, which is why its separate.
40    pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig,
41}
42
43impl DataflowParameters {
44    /// Creates a new instance of `DataflowParameters` with given shared rocksdb write buffer manager
45    /// and the cluster memory limit
46    pub fn new(
47        shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
48        cluster_memory_limit: Option<usize>,
49    ) -> Self {
50        Self {
51            upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig::new(
52                shared_rocksdb_write_buffer_manager,
53                cluster_memory_limit,
54            ),
55        }
56    }
57    /// Update the `DataflowParameters` with new configuration.
58    pub fn update(&mut self, storage_parameters: StorageParameters) {
59        self.upsert_rocksdb_tuning_config
60            .apply(storage_parameters.upsert_rocksdb_tuning_config.clone());
61    }
62}
63
64/// Internal commands that can be sent by individual operators/workers that will
65/// be broadcast to all workers. The worker main loop will receive those and act
66/// on them.
67#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
68pub enum InternalStorageCommand {
69    /// Suspend and restart the dataflow identified by the `GlobalId`.
70    SuspendAndRestart {
71        /// The id of the dataflow that should be restarted.
72        id: GlobalId,
73        /// The reason for the restart request.
74        reason: String,
75    },
76    /// Render an ingestion dataflow at the given resumption frontier.
77    CreateIngestionDataflow {
78        /// ID of the ingestion/sourve.
79        id: GlobalId,
80        /// The description of the ingestion/source.
81        ingestion_description: IngestionDescription<CollectionMetadata>,
82        /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
83        /// ingestion are guaranteed to be readable at this frontier.
84        as_of: Antichain<mz_repr::Timestamp>,
85        /// A frontier in the Materialize time domain with the property that all updates not beyond
86        /// it have already been durably ingested.
87        resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
88        /// A frontier in the source time domain with the property that all updates not beyond it
89        /// have already been durably ingested.
90        source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
91    },
92    /// Render a oneshot ingestion dataflow that fetches data from an external system and stages
93    /// batches in Persist, that can later be appended to the shard.
94    RunOneshotIngestion {
95        /// ID of the running dataflow that is doing the ingestion.
96        ingestion_id: uuid::Uuid,
97        /// ID of the collection we'll create batches for.
98        collection_id: GlobalId,
99        /// Metadata of the collection we'll create batches for.
100        collection_meta: CollectionMetadata,
101        /// Description of the oneshot ingestion.
102        request: OneshotIngestionRequest,
103    },
104    /// Render a sink dataflow.
105    RunSinkDataflow(
106        GlobalId,
107        StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
108    ),
109    /// Drop all state and operators for a dataflow. This is a vec because some
110    /// dataflows have their state spread over multiple IDs (i.e. sources that
111    /// spawn subsources); this means that actions taken in response to this
112    /// command should be permissive about missing state.
113    DropDataflow(Vec<GlobalId>),
114
115    /// Update the configuration for rendering dataflows.
116    UpdateConfiguration {
117        /// The new configuration parameters.
118        storage_parameters: StorageParameters,
119    },
120    /// For moving statistics updates to worker 0.
121    StatisticsUpdate {
122        /// Local statistics, with their epochs.
123        sources: Vec<(usize, SourceStatisticsRecord)>,
124        /// Local statistics, with their epochs.
125        sinks: Vec<(usize, SinkStatisticsRecord)>,
126    },
127}
128
129/// A sender broadcasting [`InternalStorageCommand`]s to all workers.
130#[derive(Clone)]
131pub struct InternalCommandSender {
132    tx: mpsc::Sender<InternalStorageCommand>,
133    activator: Rc<RefCell<Option<Activator>>>,
134}
135
136impl InternalCommandSender {
137    /// Broadcasts the given command to all workers.
138    pub fn send(&self, cmd: InternalStorageCommand) {
139        if self.tx.send(cmd).is_err() {
140            panic!("internal command channel disconnected");
141        }
142
143        self.activator.borrow().as_ref().map(|a| a.activate());
144    }
145}
146
147/// A receiver for [`InternalStorageCommand`]s broadcasted by workers.
148pub struct InternalCommandReceiver {
149    rx: mpsc::Receiver<InternalStorageCommand>,
150}
151
152impl InternalCommandReceiver {
153    /// Returns the next available command, if any.
154    ///
155    /// This returns `None` when there are currently no commands but there might be commands again
156    /// in the future.
157    pub fn try_recv(&self) -> Option<InternalStorageCommand> {
158        match self.rx.try_recv() {
159            Ok(cmd) => Some(cmd),
160            Err(mpsc::TryRecvError::Empty) => None,
161            Err(mpsc::TryRecvError::Disconnected) => {
162                panic!("internal command channel disconnected")
163            }
164        }
165    }
166}
167
168pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
169    timely_worker: &'w mut TimelyWorker<A>,
170) -> (InternalCommandSender, InternalCommandReceiver) {
171    let (input_tx, input_rx) = mpsc::channel();
172    let (output_tx, output_rx) = mpsc::channel();
173    let activator = Rc::new(RefCell::new(None));
174
175    timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
176        let activator = Rc::clone(&activator);
177        move |scope| {
178            let scope = &mut scope.with_label();
179            // Create a stream of commands received from `input_rx`.
180            //
181            // The output commands are tagged by worker ID and command index, allowing downstream
182            // operators to ensure their correct relative order.
183            let stream = source(scope, "command_sequencer::source", |cap, info| {
184                *activator.borrow_mut() = Some(scope.activator_for(info.address));
185
186                let worker_id = scope.index();
187                let mut cmd_index = 0;
188                let mut capability = Some(cap);
189
190                move |output| {
191                    let Some(cap) = capability.clone() else {
192                        return;
193                    };
194
195                    let mut session = output.session(&cap);
196                    loop {
197                        match input_rx.try_recv() {
198                            Ok(command) => {
199                                let cmd = IndexedCommand {
200                                    index: cmd_index,
201                                    command,
202                                };
203                                session.give((worker_id, cmd));
204                                cmd_index += 1;
205                            }
206                            Err(mpsc::TryRecvError::Empty) => break,
207                            Err(mpsc::TryRecvError::Disconnected) => {
208                                // Drop our capability to shut down.
209                                capability = None;
210                                break;
211                            }
212                        }
213                    }
214                }
215            });
216
217            // Sequence all commands through a single worker to establish a unique order.
218            //
219            // The output commands are tagged by a command index, allowing downstream operators to
220            // ensure their correct relative order.
221            let stream = stream.unary_frontier(
222                Exchange::new(|_| 0),
223                "command_sequencer::sequencer",
224                |cap, _info| {
225                    let mut cmd_index = 0;
226                    let mut capability = Some(cap);
227
228                    // For each worker, keep an ordered list of pending commands, as well as the
229                    // current index of the next command.
230                    let mut pending_commands = vec![(BTreeSet::new(), 0); scope.peers()];
231
232                    move |(input, frontier), output| {
233                        let Some(cap) = capability.clone() else {
234                            return;
235                        };
236
237                        input.for_each(|_time, data| {
238                            for (worker_id, cmd) in data.drain(..) {
239                                pending_commands[worker_id].0.insert(cmd);
240                            }
241                        });
242
243                        let mut session = output.session(&cap);
244                        for (commands, next_idx) in &mut pending_commands {
245                            while commands.first().is_some_and(|c| c.index == *next_idx) {
246                                let mut cmd = commands.pop_first().unwrap();
247                                cmd.index = cmd_index;
248                                session.give(cmd);
249
250                                *next_idx += 1;
251                                cmd_index += 1;
252                            }
253                        }
254
255                        let _ = session;
256
257                        if frontier.is_empty() {
258                            // Drop our capability to shut down.
259                            capability = None;
260                        }
261                    }
262                },
263            );
264
265            // Broadcast the ordered commands to all workers.
266            let stream = stream.broadcast();
267
268            // Sink the stream back into `output_tx`.
269            stream.sink(Pipeline, "command_sequencer::sink", {
270                // Keep an ordered list of pending commands, as well as the current index of the
271                // next command.
272                let mut pending_commands = BTreeSet::new();
273                let mut next_idx = 0;
274
275                move |(input, _frontier)| {
276                    input.for_each(|_time, data| {
277                        pending_commands.extend(data.drain(..));
278                    });
279
280                    while pending_commands
281                        .first()
282                        .is_some_and(|c| c.index == next_idx)
283                    {
284                        let cmd = pending_commands.pop_first().unwrap();
285                        let _ = output_tx.send(cmd.command);
286                        next_idx += 1;
287                    }
288                }
289            });
290        }
291    });
292
293    let tx = InternalCommandSender {
294        tx: input_tx,
295        activator,
296    };
297    let rx = InternalCommandReceiver { rx: output_rx };
298
299    (tx, rx)
300}
301
302// An [`InternalStorageCommand`] tagged with an index.
303//
304// This is a `(u64, InternalStorageCommand)` in spirit, but implements `Ord` (which
305// `InternalStorageCommand` doesn't) by looking only at the index.
306#[derive(Clone, Debug, Serialize, Deserialize)]
307struct IndexedCommand {
308    index: u64,
309    command: InternalStorageCommand,
310}
311
312impl PartialEq for IndexedCommand {
313    fn eq(&self, other: &Self) -> bool {
314        self.cmp(other).is_eq()
315    }
316}
317
318impl Eq for IndexedCommand {}
319
320impl PartialOrd for IndexedCommand {
321    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
322        Some(self.cmp(other))
323    }
324}
325
326impl Ord for IndexedCommand {
327    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
328        self.index.cmp(&other.index)
329    }
330}