mz_storage/internal_control.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Types for cluster-internal control messages that can be broadcast to all
7//! workers from individual operators/workers.
8
9use std::cell::RefCell;
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::sync::mpsc;
13
14use mz_repr::{GlobalId, Row};
15use mz_rocksdb::config::SharedWriteBufferManager;
16use mz_storage_types::controller::CollectionMetadata;
17use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
18use mz_storage_types::parameters::StorageParameters;
19use mz_storage_types::sinks::StorageSinkDesc;
20use mz_storage_types::sources::IngestionDescription;
21use serde::{Deserialize, Serialize};
22use timely::communication::Allocate;
23use timely::dataflow::channels::pact::{Exchange, Pipeline};
24use timely::dataflow::operators::generic::{OutputHandle, source};
25use timely::dataflow::operators::{Broadcast, Operator};
26use timely::progress::Antichain;
27use timely::scheduling::{Activator, Scheduler};
28use timely::worker::Worker as TimelyWorker;
29
30use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};
31
32/// _Dynamic_ storage instance configuration parameters that are used during dataflow rendering.
33/// Changes to these parameters are applied to `StorageWorker`s in a consistent order
34/// with source and sink creation.
35#[derive(Debug)]
36pub struct DataflowParameters {
37 /// Configuration/tuning for RocksDB. This also contains
38 /// some shared objects, which is why its separate.
39 pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig,
40}
41
42impl DataflowParameters {
43 /// Creates a new instance of `DataflowParameters` with given shared rocksdb write buffer manager
44 /// and the cluster memory limit
45 pub fn new(
46 shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
47 cluster_memory_limit: Option<usize>,
48 ) -> Self {
49 Self {
50 upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig::new(
51 shared_rocksdb_write_buffer_manager,
52 cluster_memory_limit,
53 ),
54 }
55 }
56 /// Update the `DataflowParameters` with new configuration.
57 pub fn update(&mut self, storage_parameters: StorageParameters) {
58 self.upsert_rocksdb_tuning_config
59 .apply(storage_parameters.upsert_rocksdb_tuning_config.clone());
60 }
61}
62
63/// Internal commands that can be sent by individual operators/workers that will
64/// be broadcast to all workers. The worker main loop will receive those and act
65/// on them.
66#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
67pub enum InternalStorageCommand {
68 /// Suspend and restart the dataflow identified by the `GlobalId`.
69 SuspendAndRestart {
70 /// The id of the dataflow that should be restarted.
71 id: GlobalId,
72 /// The reason for the restart request.
73 reason: String,
74 },
75 /// Render an ingestion dataflow at the given resumption frontier.
76 CreateIngestionDataflow {
77 /// ID of the ingestion/sourve.
78 id: GlobalId,
79 /// The description of the ingestion/source.
80 ingestion_description: IngestionDescription<CollectionMetadata>,
81 /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
82 /// ingestion are guaranteed to be readable at this frontier.
83 as_of: Antichain<mz_repr::Timestamp>,
84 /// A frontier in the Materialize time domain with the property that all updates not beyond
85 /// it have already been durably ingested.
86 resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
87 /// A frontier in the source time domain with the property that all updates not beyond it
88 /// have already been durably ingested.
89 source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
90 },
91 /// Render a oneshot ingestion dataflow that fetches data from an external system and stages
92 /// batches in Persist, that can later be appended to the shard.
93 RunOneshotIngestion {
94 /// ID of the running dataflow that is doing the ingestion.
95 ingestion_id: uuid::Uuid,
96 /// ID of the collection we'll create batches for.
97 collection_id: GlobalId,
98 /// Metadata of the collection we'll create batches for.
99 collection_meta: CollectionMetadata,
100 /// Description of the oneshot ingestion.
101 request: OneshotIngestionRequest,
102 },
103 /// Render a sink dataflow.
104 RunSinkDataflow(
105 GlobalId,
106 StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
107 ),
108 /// Drop all state and operators for a dataflow. This is a vec because some
109 /// dataflows have their state spread over multiple IDs (i.e. sources that
110 /// spawn subsources); this means that actions taken in response to this
111 /// command should be permissive about missing state.
112 DropDataflow(Vec<GlobalId>),
113
114 /// Update the configuration for rendering dataflows.
115 UpdateConfiguration {
116 /// The new configuration parameters.
117 storage_parameters: StorageParameters,
118 },
119 /// For moving statistics updates to worker 0.
120 StatisticsUpdate {
121 /// Local statistics, with their epochs.
122 sources: Vec<(usize, SourceStatisticsRecord)>,
123 /// Local statistics, with their epochs.
124 sinks: Vec<(usize, SinkStatisticsRecord)>,
125 },
126}
127
128/// A sender broadcasting [`InternalStorageCommand`]s to all workers.
129#[derive(Clone)]
130pub struct InternalCommandSender {
131 tx: mpsc::Sender<InternalStorageCommand>,
132 activator: Rc<RefCell<Option<Activator>>>,
133}
134
135impl InternalCommandSender {
136 /// Broadcasts the given command to all workers.
137 pub fn send(&self, cmd: InternalStorageCommand) {
138 if self.tx.send(cmd).is_err() {
139 panic!("internal command channel disconnected");
140 }
141
142 self.activator.borrow().as_ref().map(|a| a.activate());
143 }
144}
145
146/// A receiver for [`InternalStorageCommand`]s broadcasted by workers.
147pub struct InternalCommandReceiver {
148 rx: mpsc::Receiver<InternalStorageCommand>,
149}
150
151impl InternalCommandReceiver {
152 /// Returns the next available command, if any.
153 ///
154 /// This returns `None` when there are currently no commands but there might be commands again
155 /// in the future.
156 pub fn try_recv(&self) -> Option<InternalStorageCommand> {
157 match self.rx.try_recv() {
158 Ok(cmd) => Some(cmd),
159 Err(mpsc::TryRecvError::Empty) => None,
160 Err(mpsc::TryRecvError::Disconnected) => {
161 panic!("internal command channel disconnected")
162 }
163 }
164 }
165}
166
167pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
168 timely_worker: &'w mut TimelyWorker<A>,
169) -> (InternalCommandSender, InternalCommandReceiver) {
170 let (input_tx, input_rx) = mpsc::channel();
171 let (output_tx, output_rx) = mpsc::channel();
172 let activator = Rc::new(RefCell::new(None));
173
174 timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
175 let activator = Rc::clone(&activator);
176 move |scope| {
177 // Create a stream of commands received from `input_rx`.
178 //
179 // The output commands are tagged by worker ID and command index, allowing downstream
180 // operators to ensure their correct relative order.
181 let stream = source(scope, "command_sequencer::source", |cap, info| {
182 *activator.borrow_mut() = Some(scope.activator_for(info.address));
183
184 let worker_id = scope.index();
185 let mut cmd_index = 0;
186 let mut capability = Some(cap);
187
188 move |output: &mut OutputHandle<_, _, _>| {
189 let Some(cap) = &capability else {
190 return;
191 };
192
193 let mut session = output.session(cap);
194 loop {
195 match input_rx.try_recv() {
196 Ok(command) => {
197 let cmd = IndexedCommand {
198 index: cmd_index,
199 command,
200 };
201 session.give((worker_id, cmd));
202 cmd_index += 1;
203 }
204 Err(mpsc::TryRecvError::Empty) => break,
205 Err(mpsc::TryRecvError::Disconnected) => {
206 // Drop our capability to shut down.
207 capability = None;
208 break;
209 }
210 }
211 }
212 }
213 });
214
215 // Sequence all commands through a single worker to establish a unique order.
216 //
217 // The output commands are tagged by a command index, allowing downstream operators to
218 // ensure their correct relative order.
219 let stream = stream.unary_frontier(
220 Exchange::new(|_| 0),
221 "command_sequencer::sequencer",
222 |cap, _info| {
223 let mut cmd_index = 0;
224 let mut capability = Some(cap);
225
226 // For each worker, keep an ordered list of pending commands, as well as the
227 // current index of the next command.
228 let mut pending_commands = vec![(BTreeSet::new(), 0); scope.peers()];
229
230 move |input, output: &mut OutputHandle<_, _, _>| {
231 let Some(cap) = &capability else {
232 return;
233 };
234
235 while let Some((_cap, data)) = input.next() {
236 for (worker_id, cmd) in data.drain(..) {
237 pending_commands[worker_id].0.insert(cmd);
238 }
239 }
240
241 let mut session = output.session(cap);
242 for (commands, next_idx) in &mut pending_commands {
243 while commands.first().is_some_and(|c| c.index == *next_idx) {
244 let mut cmd = commands.pop_first().unwrap();
245 cmd.index = cmd_index;
246 session.give(cmd);
247
248 *next_idx += 1;
249 cmd_index += 1;
250 }
251 }
252
253 if input.frontier().is_empty() {
254 // Drop our capability to shut down.
255 capability = None;
256 }
257 }
258 },
259 );
260
261 // Broadcast the ordered commands to all workers.
262 let stream = stream.broadcast();
263
264 // Sink the stream back into `output_tx`.
265 stream.sink(Pipeline, "command_sequencer::sink", {
266 // Keep an ordered list of pending commands, as well as the current index of the
267 // next command.
268 let mut pending_commands = BTreeSet::new();
269 let mut next_idx = 0;
270
271 move |input| {
272 while let Some((_cap, data)) = input.next() {
273 pending_commands.extend(data.drain(..));
274 }
275
276 while pending_commands
277 .first()
278 .is_some_and(|c| c.index == next_idx)
279 {
280 let cmd = pending_commands.pop_first().unwrap();
281 let _ = output_tx.send(cmd.command);
282 next_idx += 1;
283 }
284 }
285 });
286 }
287 });
288
289 let tx = InternalCommandSender {
290 tx: input_tx,
291 activator,
292 };
293 let rx = InternalCommandReceiver { rx: output_rx };
294
295 (tx, rx)
296}
297
298// An [`InternalStorageCommand`] tagged with an index.
299//
300// This is a `(u64, InternalStorageCommand)` in spirit, but implements `Ord` (which
301// `InternalStorageCommand` doesn't) by looking only at the index.
302#[derive(Clone, Debug, Serialize, Deserialize)]
303struct IndexedCommand {
304 index: u64,
305 command: InternalStorageCommand,
306}
307
308impl PartialEq for IndexedCommand {
309 fn eq(&self, other: &Self) -> bool {
310 self.cmp(other).is_eq()
311 }
312}
313
314impl Eq for IndexedCommand {}
315
316impl PartialOrd for IndexedCommand {
317 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
318 Some(self.cmp(other))
319 }
320}
321
322impl Ord for IndexedCommand {
323 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
324 self.index.cmp(&other.index)
325 }
326}