mz_storage/internal_control.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Types for cluster-internal control messages that can be broadcast to all
7//! workers from individual operators/workers.
8
9use std::cell::RefCell;
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::sync::mpsc;
13
14use mz_repr::{GlobalId, Row};
15use mz_rocksdb::config::SharedWriteBufferManager;
16use mz_storage_types::controller::CollectionMetadata;
17use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
18use mz_storage_types::parameters::StorageParameters;
19use mz_storage_types::sinks::StorageSinkDesc;
20use mz_storage_types::sources::IngestionDescription;
21use mz_timely_util::scope_label::ScopeExt;
22use serde::{Deserialize, Serialize};
23use timely::communication::Allocate;
24use timely::dataflow::channels::pact::{Exchange, Pipeline};
25use timely::dataflow::operators::generic::source;
26use timely::dataflow::operators::{Broadcast, Operator};
27use timely::progress::Antichain;
28use timely::scheduling::{Activator, Scheduler};
29use timely::worker::{AsWorker, Worker as TimelyWorker};
30
31use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};
32
33/// _Dynamic_ storage instance configuration parameters that are used during dataflow rendering.
34/// Changes to these parameters are applied to `StorageWorker`s in a consistent order
35/// with source and sink creation.
36#[derive(Debug)]
37pub struct DataflowParameters {
38 /// Configuration/tuning for RocksDB. This also contains
39 /// some shared objects, which is why its separate.
40 pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig,
41}
42
43impl DataflowParameters {
44 /// Creates a new instance of `DataflowParameters` with given shared rocksdb write buffer manager
45 /// and the cluster memory limit
46 pub fn new(
47 shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
48 cluster_memory_limit: Option<usize>,
49 ) -> Self {
50 Self {
51 upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig::new(
52 shared_rocksdb_write_buffer_manager,
53 cluster_memory_limit,
54 ),
55 }
56 }
57 /// Update the `DataflowParameters` with new configuration.
58 pub fn update(&mut self, storage_parameters: StorageParameters) {
59 self.upsert_rocksdb_tuning_config
60 .apply(storage_parameters.upsert_rocksdb_tuning_config.clone());
61 }
62}
63
64/// Internal commands that can be sent by individual operators/workers that will
65/// be broadcast to all workers. The worker main loop will receive those and act
66/// on them.
67#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
68pub enum InternalStorageCommand {
69 /// Suspend and restart the dataflow identified by the `GlobalId`.
70 SuspendAndRestart {
71 /// The id of the dataflow that should be restarted.
72 id: GlobalId,
73 /// The reason for the restart request.
74 reason: String,
75 },
76 /// Render an ingestion dataflow at the given resumption frontier.
77 CreateIngestionDataflow {
78 /// ID of the ingestion/sourve.
79 id: GlobalId,
80 /// The description of the ingestion/source.
81 ingestion_description: IngestionDescription<CollectionMetadata>,
82 /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
83 /// ingestion are guaranteed to be readable at this frontier.
84 as_of: Antichain<mz_repr::Timestamp>,
85 /// A frontier in the Materialize time domain with the property that all updates not beyond
86 /// it have already been durably ingested.
87 resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
88 /// A frontier in the source time domain with the property that all updates not beyond it
89 /// have already been durably ingested.
90 source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
91 },
92 /// Render a oneshot ingestion dataflow that fetches data from an external system and stages
93 /// batches in Persist, that can later be appended to the shard.
94 RunOneshotIngestion {
95 /// ID of the running dataflow that is doing the ingestion.
96 ingestion_id: uuid::Uuid,
97 /// ID of the collection we'll create batches for.
98 collection_id: GlobalId,
99 /// Metadata of the collection we'll create batches for.
100 collection_meta: CollectionMetadata,
101 /// Description of the oneshot ingestion.
102 request: OneshotIngestionRequest,
103 },
104 /// Render a sink dataflow.
105 RunSinkDataflow(
106 GlobalId,
107 StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
108 ),
109 /// Drop all state and operators for a dataflow. This is a vec because some
110 /// dataflows have their state spread over multiple IDs (i.e. sources that
111 /// spawn subsources); this means that actions taken in response to this
112 /// command should be permissive about missing state.
113 DropDataflow(Vec<GlobalId>),
114
115 /// Update the configuration for rendering dataflows.
116 UpdateConfiguration {
117 /// The new configuration parameters.
118 storage_parameters: StorageParameters,
119 },
120 /// For moving statistics updates to worker 0.
121 StatisticsUpdate {
122 /// Local statistics, with their epochs.
123 sources: Vec<(usize, SourceStatisticsRecord)>,
124 /// Local statistics, with their epochs.
125 sinks: Vec<(usize, SinkStatisticsRecord)>,
126 },
127}
128
129/// A sender broadcasting [`InternalStorageCommand`]s to all workers.
130#[derive(Clone)]
131pub struct InternalCommandSender {
132 tx: mpsc::Sender<InternalStorageCommand>,
133 activator: Rc<RefCell<Option<Activator>>>,
134}
135
136impl InternalCommandSender {
137 /// Broadcasts the given command to all workers.
138 pub fn send(&self, cmd: InternalStorageCommand) {
139 if self.tx.send(cmd).is_err() {
140 panic!("internal command channel disconnected");
141 }
142
143 self.activator.borrow().as_ref().map(|a| a.activate());
144 }
145}
146
147/// A receiver for [`InternalStorageCommand`]s broadcasted by workers.
148pub struct InternalCommandReceiver {
149 rx: mpsc::Receiver<InternalStorageCommand>,
150}
151
152impl InternalCommandReceiver {
153 /// Returns the next available command, if any.
154 ///
155 /// This returns `None` when there are currently no commands but there might be commands again
156 /// in the future.
157 pub fn try_recv(&self) -> Option<InternalStorageCommand> {
158 match self.rx.try_recv() {
159 Ok(cmd) => Some(cmd),
160 Err(mpsc::TryRecvError::Empty) => None,
161 Err(mpsc::TryRecvError::Disconnected) => {
162 panic!("internal command channel disconnected")
163 }
164 }
165 }
166}
167
168pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
169 timely_worker: &'w mut TimelyWorker<A>,
170) -> (InternalCommandSender, InternalCommandReceiver) {
171 let (input_tx, input_rx) = mpsc::channel();
172 let (output_tx, output_rx) = mpsc::channel();
173 let activator = Rc::new(RefCell::new(None));
174
175 timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
176 let activator = Rc::clone(&activator);
177 move |scope| {
178 let scope = &mut scope.with_label();
179 // Create a stream of commands received from `input_rx`.
180 //
181 // The output commands are tagged by worker ID and command index, allowing downstream
182 // operators to ensure their correct relative order.
183 let stream = source(scope, "command_sequencer::source", |cap, info| {
184 *activator.borrow_mut() = Some(scope.activator_for(info.address));
185
186 let worker_id = scope.index();
187 let mut cmd_index = 0;
188 let mut capability = Some(cap);
189
190 move |output| {
191 let Some(cap) = capability.clone() else {
192 return;
193 };
194
195 let mut session = output.session(&cap);
196 loop {
197 match input_rx.try_recv() {
198 Ok(command) => {
199 let cmd = IndexedCommand {
200 index: cmd_index,
201 command,
202 };
203 session.give((worker_id, cmd));
204 cmd_index += 1;
205 }
206 Err(mpsc::TryRecvError::Empty) => break,
207 Err(mpsc::TryRecvError::Disconnected) => {
208 // Drop our capability to shut down.
209 capability = None;
210 break;
211 }
212 }
213 }
214 }
215 });
216
217 // Sequence all commands through a single worker to establish a unique order.
218 //
219 // The output commands are tagged by a command index, allowing downstream operators to
220 // ensure their correct relative order.
221 let stream = stream.unary_frontier(
222 Exchange::new(|_| 0),
223 "command_sequencer::sequencer",
224 |cap, _info| {
225 let mut cmd_index = 0;
226 let mut capability = Some(cap);
227
228 // For each worker, keep an ordered list of pending commands, as well as the
229 // current index of the next command.
230 let mut pending_commands = vec![(BTreeSet::new(), 0); scope.peers()];
231
232 move |(input, frontier), output| {
233 let Some(cap) = capability.clone() else {
234 return;
235 };
236
237 input.for_each(|_time, data| {
238 for (worker_id, cmd) in data.drain(..) {
239 pending_commands[worker_id].0.insert(cmd);
240 }
241 });
242
243 let mut session = output.session(&cap);
244 for (commands, next_idx) in &mut pending_commands {
245 while commands.first().is_some_and(|c| c.index == *next_idx) {
246 let mut cmd = commands.pop_first().unwrap();
247 cmd.index = cmd_index;
248 session.give(cmd);
249
250 *next_idx += 1;
251 cmd_index += 1;
252 }
253 }
254
255 let _ = session;
256
257 if frontier.is_empty() {
258 // Drop our capability to shut down.
259 capability = None;
260 }
261 }
262 },
263 );
264
265 // Broadcast the ordered commands to all workers.
266 let stream = stream.broadcast();
267
268 // Sink the stream back into `output_tx`.
269 stream.sink(Pipeline, "command_sequencer::sink", {
270 // Keep an ordered list of pending commands, as well as the current index of the
271 // next command.
272 let mut pending_commands = BTreeSet::new();
273 let mut next_idx = 0;
274
275 move |(input, _frontier)| {
276 input.for_each(|_time, data| {
277 pending_commands.extend(data.drain(..));
278 });
279
280 while pending_commands
281 .first()
282 .is_some_and(|c| c.index == next_idx)
283 {
284 let cmd = pending_commands.pop_first().unwrap();
285 let _ = output_tx.send(cmd.command);
286 next_idx += 1;
287 }
288 }
289 });
290 }
291 });
292
293 let tx = InternalCommandSender {
294 tx: input_tx,
295 activator,
296 };
297 let rx = InternalCommandReceiver { rx: output_rx };
298
299 (tx, rx)
300}
301
302// An [`InternalStorageCommand`] tagged with an index.
303//
304// This is a `(u64, InternalStorageCommand)` in spirit, but implements `Ord` (which
305// `InternalStorageCommand` doesn't) by looking only at the index.
306#[derive(Clone, Debug, Serialize, Deserialize)]
307struct IndexedCommand {
308 index: u64,
309 command: InternalStorageCommand,
310}
311
312impl PartialEq for IndexedCommand {
313 fn eq(&self, other: &Self) -> bool {
314 self.cmp(other).is_eq()
315 }
316}
317
318impl Eq for IndexedCommand {}
319
320impl PartialOrd for IndexedCommand {
321 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
322 Some(self.cmp(other))
323 }
324}
325
326impl Ord for IndexedCommand {
327 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
328 self.index.cmp(&other.index)
329 }
330}