1use std::cell::RefCell;
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::sync::mpsc;
13
14use mz_repr::{GlobalId, Row};
15use mz_rocksdb::config::SharedWriteBufferManager;
16use mz_storage_types::controller::CollectionMetadata;
17use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
18use mz_storage_types::parameters::StorageParameters;
19use mz_storage_types::sinks::StorageSinkDesc;
20use mz_storage_types::sources::IngestionDescription;
21use mz_timely_util::scope_label::ScopeExt;
22use serde::{Deserialize, Serialize};
23use timely::communication::Allocate;
24use timely::dataflow::channels::pact::{Exchange, Pipeline};
25use timely::dataflow::operators::Operator;
26use timely::dataflow::operators::generic::source;
27use timely::dataflow::operators::vec::Broadcast;
28use timely::progress::Antichain;
29use timely::scheduling::{Activator, Scheduler};
30use timely::worker::{AsWorker, Worker as TimelyWorker};
31
32use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};
33
34#[derive(Debug)]
38pub struct DataflowParameters {
39 pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig,
42}
43
44impl DataflowParameters {
45 pub fn new(
48 shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
49 cluster_memory_limit: Option<usize>,
50 ) -> Self {
51 Self {
52 upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig::new(
53 shared_rocksdb_write_buffer_manager,
54 cluster_memory_limit,
55 ),
56 }
57 }
58 pub fn update(&mut self, storage_parameters: StorageParameters) {
60 self.upsert_rocksdb_tuning_config
61 .apply(storage_parameters.upsert_rocksdb_tuning_config.clone());
62 }
63}
64
65#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
69pub enum InternalStorageCommand {
70 SuspendAndRestart {
72 id: GlobalId,
74 reason: String,
76 },
77 CreateIngestionDataflow {
79 id: GlobalId,
81 ingestion_description: IngestionDescription<CollectionMetadata>,
83 as_of: Antichain<mz_repr::Timestamp>,
86 resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
89 source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
92 },
93 RunOneshotIngestion {
96 ingestion_id: uuid::Uuid,
98 collection_id: GlobalId,
100 collection_meta: CollectionMetadata,
102 request: OneshotIngestionRequest,
104 },
105 RunSinkDataflow(
107 GlobalId,
108 StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
109 ),
110 DropDataflow(Vec<GlobalId>),
115
116 UpdateConfiguration {
118 storage_parameters: StorageParameters,
120 },
121 StatisticsUpdate {
123 sources: Vec<(usize, SourceStatisticsRecord)>,
125 sinks: Vec<(usize, SinkStatisticsRecord)>,
127 },
128}
129
130#[derive(Clone)]
132pub struct InternalCommandSender {
133 tx: mpsc::Sender<InternalStorageCommand>,
134 activator: Rc<RefCell<Option<Activator>>>,
135}
136
137impl InternalCommandSender {
138 pub fn send(&self, cmd: InternalStorageCommand) {
140 if self.tx.send(cmd).is_err() {
141 panic!("internal command channel disconnected");
142 }
143
144 self.activator.borrow().as_ref().map(|a| a.activate());
145 }
146}
147
148pub struct InternalCommandReceiver {
150 rx: mpsc::Receiver<InternalStorageCommand>,
151}
152
153impl InternalCommandReceiver {
154 pub fn try_recv(&self) -> Option<InternalStorageCommand> {
159 match self.rx.try_recv() {
160 Ok(cmd) => Some(cmd),
161 Err(mpsc::TryRecvError::Empty) => None,
162 Err(mpsc::TryRecvError::Disconnected) => {
163 panic!("internal command channel disconnected")
164 }
165 }
166 }
167}
168
169pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
170 timely_worker: &'w mut TimelyWorker<A>,
171) -> (InternalCommandSender, InternalCommandReceiver) {
172 let (input_tx, input_rx) = mpsc::channel();
173 let (output_tx, output_rx) = mpsc::channel();
174 let activator = Rc::new(RefCell::new(None));
175
176 timely_worker.dataflow_named::<(), _, _>("command_sequencer", {
177 let activator = Rc::clone(&activator);
178 move |scope| {
179 let scope = &mut scope.with_label();
180 let stream = source(scope, "command_sequencer::source", |cap, info| {
185 *activator.borrow_mut() = Some(scope.activator_for(info.address));
186
187 let worker_id = scope.index();
188 let mut cmd_index = 0;
189 let mut capability = Some(cap);
190
191 move |output| {
192 let Some(cap) = capability.clone() else {
193 return;
194 };
195
196 let mut session = output.session(&cap);
197 loop {
198 match input_rx.try_recv() {
199 Ok(command) => {
200 let cmd = IndexedCommand {
201 index: cmd_index,
202 command,
203 };
204 session.give((worker_id, cmd));
205 cmd_index += 1;
206 }
207 Err(mpsc::TryRecvError::Empty) => break,
208 Err(mpsc::TryRecvError::Disconnected) => {
209 capability = None;
211 break;
212 }
213 }
214 }
215 }
216 });
217
218 let stream = stream.unary_frontier(
223 Exchange::new(|_| 0),
224 "command_sequencer::sequencer",
225 |cap, _info| {
226 let mut cmd_index = 0;
227 let mut capability = Some(cap);
228
229 let mut pending_commands = vec![(BTreeSet::new(), 0); scope.peers()];
232
233 move |(input, frontier), output| {
234 let Some(cap) = capability.clone() else {
235 return;
236 };
237
238 input.for_each(|_time, data| {
239 for (worker_id, cmd) in data.drain(..) {
240 pending_commands[worker_id].0.insert(cmd);
241 }
242 });
243
244 let mut session = output.session(&cap);
245 for (commands, next_idx) in &mut pending_commands {
246 while commands.first().is_some_and(|c| c.index == *next_idx) {
247 let mut cmd = commands.pop_first().unwrap();
248 cmd.index = cmd_index;
249 session.give(cmd);
250
251 *next_idx += 1;
252 cmd_index += 1;
253 }
254 }
255
256 let _ = session;
257
258 if frontier.is_empty() {
259 capability = None;
261 }
262 }
263 },
264 );
265
266 let stream = stream.broadcast();
268
269 stream.sink(Pipeline, "command_sequencer::sink", {
271 let mut pending_commands = BTreeSet::new();
274 let mut next_idx = 0;
275
276 move |(input, _frontier)| {
277 input.for_each(|_time, data| {
278 pending_commands.extend(data.drain(..));
279 });
280
281 while pending_commands
282 .first()
283 .is_some_and(|c| c.index == next_idx)
284 {
285 let cmd = pending_commands.pop_first().unwrap();
286 let _ = output_tx.send(cmd.command);
287 next_idx += 1;
288 }
289 }
290 });
291 }
292 });
293
294 let tx = InternalCommandSender {
295 tx: input_tx,
296 activator,
297 };
298 let rx = InternalCommandReceiver { rx: output_rx };
299
300 (tx, rx)
301}
302
303#[derive(Clone, Debug, Serialize, Deserialize)]
308struct IndexedCommand {
309 index: u64,
310 command: InternalStorageCommand,
311}
312
313impl PartialEq for IndexedCommand {
314 fn eq(&self, other: &Self) -> bool {
315 self.cmp(other).is_eq()
316 }
317}
318
319impl Eq for IndexedCommand {}
320
321impl PartialOrd for IndexedCommand {
322 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
323 Some(self.cmp(other))
324 }
325}
326
327impl Ord for IndexedCommand {
328 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
329 self.index.cmp(&other.index)
330 }
331}