Skip to main content

mz_storage/
internal_control.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Types for cluster-internal control messages that can be broadcast to all
7//! workers from individual operators/workers.
8
9use std::cell::RefCell;
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::sync::mpsc;
13
14use mz_repr::{GlobalId, Row};
15use mz_rocksdb::config::SharedWriteBufferManager;
16use mz_storage_types::controller::CollectionMetadata;
17use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
18use mz_storage_types::parameters::StorageParameters;
19use mz_storage_types::sinks::StorageSinkDesc;
20use mz_storage_types::sources::IngestionDescription;
21use mz_timely_util::scope_label::ScopeExt;
22use serde::{Deserialize, Serialize};
23use timely::communication::Allocate;
24use timely::dataflow::channels::pact::{Exchange, Pipeline};
25use timely::dataflow::operators::Operator;
26use timely::dataflow::operators::generic::source;
27use timely::dataflow::operators::vec::Broadcast;
28use timely::progress::Antichain;
29use timely::scheduling::{Activator, Scheduler};
30use timely::worker::{AsWorker, Worker as TimelyWorker};
31
32use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};
33
34/// _Dynamic_ storage instance configuration parameters that are used during dataflow rendering.
35/// Changes to these parameters are applied to `StorageWorker`s in a consistent order
36/// with source and sink creation.
37#[derive(Debug)]
38pub struct DataflowParameters {
39    /// Configuration/tuning for RocksDB. This also contains
40    /// some shared objects, which is why its separate.
41    pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig,
42}
43
44impl DataflowParameters {
45    /// Creates a new instance of `DataflowParameters` with given shared rocksdb write buffer manager
46    /// and the cluster memory limit
47    pub fn new(
48        shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
49        cluster_memory_limit: Option<usize>,
50    ) -> Self {
51        Self {
52            upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig::new(
53                shared_rocksdb_write_buffer_manager,
54                cluster_memory_limit,
55            ),
56        }
57    }
58    /// Update the `DataflowParameters` with new configuration.
59    pub fn update(&mut self, storage_parameters: StorageParameters) {
60        self.upsert_rocksdb_tuning_config
61            .apply(storage_parameters.upsert_rocksdb_tuning_config.clone());
62    }
63}
64
65/// Internal commands that can be sent by individual operators/workers that will
66/// be broadcast to all workers. The worker main loop will receive those and act
67/// on them.
68#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
69pub enum InternalStorageCommand {
70    /// Suspend and restart the dataflow identified by the `GlobalId`.
71    SuspendAndRestart {
72        /// The id of the dataflow that should be restarted.
73        id: GlobalId,
74        /// The reason for the restart request.
75        reason: String,
76    },
77    /// Render an ingestion dataflow at the given resumption frontier.
78    CreateIngestionDataflow {
79        /// ID of the ingestion/sourve.
80        id: GlobalId,
81        /// The description of the ingestion/source.
82        ingestion_description: IngestionDescription<CollectionMetadata>,
83        /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
84        /// ingestion are guaranteed to be readable at this frontier.
85        as_of: Antichain<mz_repr::Timestamp>,
86        /// A frontier in the Materialize time domain with the property that all updates not beyond
87        /// it have already been durably ingested.
88        resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
89        /// A frontier in the source time domain with the property that all updates not beyond it
90        /// have already been durably ingested.
91        source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
92    },
93    /// Render a oneshot ingestion dataflow that fetches data from an external system and stages
94    /// batches in Persist, that can later be appended to the shard.
95    RunOneshotIngestion {
96        /// ID of the running dataflow that is doing the ingestion.
97        ingestion_id: uuid::Uuid,
98        /// ID of the collection we'll create batches for.
99        collection_id: GlobalId,
100        /// Metadata of the collection we'll create batches for.
101        collection_meta: CollectionMetadata,
102        /// Description of the oneshot ingestion.
103        request: OneshotIngestionRequest,
104    },
105    /// Render a sink dataflow.
106    RunSinkDataflow(
107        GlobalId,
108        StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
109    ),
110    /// Drop all state and operators for a dataflow. This is a vec because some
111    /// dataflows have their state spread over multiple IDs (i.e. sources that
112    /// spawn subsources); this means that actions taken in response to this
113    /// command should be permissive about missing state.
114    DropDataflow(Vec<GlobalId>),
115
116    /// Update the configuration for rendering dataflows.
117    UpdateConfiguration {
118        /// The new configuration parameters.
119        storage_parameters: StorageParameters,
120    },
121    /// For moving statistics updates to worker 0.
122    StatisticsUpdate {
123        /// Local statistics, with their epochs.
124        sources: Vec<(usize, SourceStatisticsRecord)>,
125        /// Local statistics, with their epochs.
126        sinks: Vec<(usize, SinkStatisticsRecord)>,
127    },
128}
129
130/// A sender broadcasting [`InternalStorageCommand`]s to all workers.
131#[derive(Clone)]
132pub struct InternalCommandSender {
133    tx: mpsc::Sender<InternalStorageCommand>,
134    activator: Rc<RefCell<Option<Activator>>>,
135}
136
137impl InternalCommandSender {
138    /// Broadcasts the given command to all workers.
139    pub fn send(&self, cmd: InternalStorageCommand) {
140        if self.tx.send(cmd).is_err() {
141            panic!("internal command channel disconnected");
142        }
143
144        self.activator.borrow().as_ref().map(|a| a.activate());
145    }
146}
147
148/// A receiver for [`InternalStorageCommand`]s broadcasted by workers.
149pub struct InternalCommandReceiver {
150    rx: mpsc::Receiver<InternalStorageCommand>,
151}
152
153impl InternalCommandReceiver {
154    /// Returns the next available command, if any.
155    ///
156    /// This returns `None` when there are currently no commands but there might be commands again
157    /// in the future.
158    pub fn try_recv(&self) -> Option<InternalStorageCommand> {
159        match self.rx.try_recv() {
160            Ok(cmd) => Some(cmd),
161            Err(mpsc::TryRecvError::Empty) => None,
162            Err(mpsc::TryRecvError::Disconnected) => {
163                panic!("internal command channel disconnected")
164            }
165        }
166    }
167}
168
169pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
170    timely_worker: &'w mut TimelyWorker<A>,
171) -> (InternalCommandSender, InternalCommandReceiver) {
172    let (input_tx, input_rx) = mpsc::channel();
173    let (output_tx, output_rx) = mpsc::channel();
174    let activator = Rc::new(RefCell::new(None));
175
176    timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
177        let activator = Rc::clone(&activator);
178        move |scope| {
179            let scope = &mut scope.with_label();
180            // Create a stream of commands received from `input_rx`.
181            //
182            // The output commands are tagged by worker ID and command index, allowing downstream
183            // operators to ensure their correct relative order.
184            let stream = source(scope, "command_sequencer::source", |cap, info| {
185                *activator.borrow_mut() = Some(scope.activator_for(info.address));
186
187                let worker_id = scope.index();
188                let mut cmd_index = 0;
189                let mut capability = Some(cap);
190
191                move |output| {
192                    let Some(cap) = capability.clone() else {
193                        return;
194                    };
195
196                    let mut session = output.session(&cap);
197                    loop {
198                        match input_rx.try_recv() {
199                            Ok(command) => {
200                                let cmd = IndexedCommand {
201                                    index: cmd_index,
202                                    command,
203                                };
204                                session.give((worker_id, cmd));
205                                cmd_index += 1;
206                            }
207                            Err(mpsc::TryRecvError::Empty) => break,
208                            Err(mpsc::TryRecvError::Disconnected) => {
209                                // Drop our capability to shut down.
210                                capability = None;
211                                break;
212                            }
213                        }
214                    }
215                }
216            });
217
218            // Sequence all commands through a single worker to establish a unique order.
219            //
220            // The output commands are tagged by a command index, allowing downstream operators to
221            // ensure their correct relative order.
222            let stream = stream.unary_frontier(
223                Exchange::new(|_| 0),
224                "command_sequencer::sequencer",
225                |cap, _info| {
226                    let mut cmd_index = 0;
227                    let mut capability = Some(cap);
228
229                    // For each worker, keep an ordered list of pending commands, as well as the
230                    // current index of the next command.
231                    let mut pending_commands = vec![(BTreeSet::new(), 0); scope.peers()];
232
233                    move |(input, frontier), output| {
234                        let Some(cap) = capability.clone() else {
235                            return;
236                        };
237
238                        input.for_each(|_time, data| {
239                            for (worker_id, cmd) in data.drain(..) {
240                                pending_commands[worker_id].0.insert(cmd);
241                            }
242                        });
243
244                        let mut session = output.session(&cap);
245                        for (commands, next_idx) in &mut pending_commands {
246                            while commands.first().is_some_and(|c| c.index == *next_idx) {
247                                let mut cmd = commands.pop_first().unwrap();
248                                cmd.index = cmd_index;
249                                session.give(cmd);
250
251                                *next_idx += 1;
252                                cmd_index += 1;
253                            }
254                        }
255
256                        let _ = session;
257
258                        if frontier.is_empty() {
259                            // Drop our capability to shut down.
260                            capability = None;
261                        }
262                    }
263                },
264            );
265
266            // Broadcast the ordered commands to all workers.
267            let stream = stream.broadcast();
268
269            // Sink the stream back into `output_tx`.
270            stream.sink(Pipeline, "command_sequencer::sink", {
271                // Keep an ordered list of pending commands, as well as the current index of the
272                // next command.
273                let mut pending_commands = BTreeSet::new();
274                let mut next_idx = 0;
275
276                move |(input, _frontier)| {
277                    input.for_each(|_time, data| {
278                        pending_commands.extend(data.drain(..));
279                    });
280
281                    while pending_commands
282                        .first()
283                        .is_some_and(|c| c.index == next_idx)
284                    {
285                        let cmd = pending_commands.pop_first().unwrap();
286                        let _ = output_tx.send(cmd.command);
287                        next_idx += 1;
288                    }
289                }
290            });
291        }
292    });
293
294    let tx = InternalCommandSender {
295        tx: input_tx,
296        activator,
297    };
298    let rx = InternalCommandReceiver { rx: output_rx };
299
300    (tx, rx)
301}
302
303// An [`InternalStorageCommand`] tagged with an index.
304//
305// This is a `(u64, InternalStorageCommand)` in spirit, but implements `Ord` (which
306// `InternalStorageCommand` doesn't) by looking only at the index.
307#[derive(Clone, Debug, Serialize, Deserialize)]
308struct IndexedCommand {
309    index: u64,
310    command: InternalStorageCommand,
311}
312
313impl PartialEq for IndexedCommand {
314    fn eq(&self, other: &Self) -> bool {
315        self.cmp(other).is_eq()
316    }
317}
318
319impl Eq for IndexedCommand {}
320
321impl PartialOrd for IndexedCommand {
322    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
323        Some(self.cmp(other))
324    }
325}
326
327impl Ord for IndexedCommand {
328    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
329        self.index.cmp(&other.index)
330    }
331}