mz_storage/storage_state.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for storage timely instances.
7//!
8//! One instance of a [`Worker`], along with its contained [`StorageState`], is
9//! part of an ensemble of storage workers that all run inside the same timely
10//! cluster. We call this worker a _storage worker_ to disambiguate it from
11//! other kinds of workers, potentially other components that might be sharing
12//! the same timely cluster.
13//!
14//! ## Controller and internal communication
15//!
16//! A worker receives _external_ [`StorageCommands`](StorageCommand) from the
17//! storage controller, via a channel. Storage workers also share an _internal_
18//! control/command fabric ([`internal_control`]). Internal commands go through
19//! a sequencer dataflow that ensures that all workers receive all commands in
20//! the same consistent order.
21//!
22//! We need to make sure that commands that cause dataflows to be rendered are
23//! processed in the same consistent order across all workers because timely
24//! requires this. To achieve this, we make sure that only internal commands can
25//! cause dataflows to be rendered. External commands (from the controller)
26//! cause internal commands to be broadcast (by only one worker), to get
27//! dataflows rendered.
28//!
29//! The internal command fabric is also used to broadcast messages from a local
30//! operator/worker to all workers. For example, when we need to tear down and
31//! restart a dataflow on all workers when an error is encountered.
32//!
33//! ## Async Storage Worker
34//!
35//! The storage worker has a companion [`AsyncStorageWorker`] that must be used
36//! when running code that requires `async`. This is needed because a timely
37//! main loop cannot run `async` code.
38//!
39//! ## Example flow of commands for `RunIngestion`
40//!
41//! With external commands, internal commands, and the async worker,
42//! understanding where and how commands from the controller are realized can
43//! get complicated. We will follow the complete flow for `RunIngestion`, as an
44//! example:
45//!
46//! 1. Worker receives a [`StorageCommand::RunIngestion`] command from the
47//! controller.
48//! 2. This command is processed in [`StorageState::handle_storage_command`].
49//! This step cannot render dataflows, because it does not have access to the
50//! timely worker. It will only set up state that stays over the whole
51//! lifetime of the source, such as the `reported_frontier`. Putting in place
52//! this reported frontier will enable frontier reporting for that source. We
53//! will not start reporting when we only see an internal command for
54//! rendering a dataflow, which can "overtake" the external `RunIngestion`
55//! command.
56//! 3. During processing of that command, we call
57//! [`AsyncStorageWorker::update_ingestion_frontiers`], which causes a command to
58//! be sent to the async worker.
59//! 4. We eventually get a response from the async worker:
60//! [`AsyncStorageWorkerResponse::IngestionFrontiersUpdated`].
61//! 5. This response is handled in [`Worker::handle_async_worker_response`].
62//! 6. Handling that response causes a
63//! [`InternalStorageCommand::CreateIngestionDataflow`] to be broadcast to
64//! all workers via the internal command fabric.
65//! 7. This message will be processed (on each worker) in
66//! [`Worker::handle_internal_storage_command`]. This is what will cause the
67//! required dataflow to be rendered on all workers.
68//!
69//! The process described above assumes that the `RunIngestion` is _not_ an
70//! update, i.e. it is in response to a `CREATE SOURCE`-like statement.
71//!
72//! The primary distinction when handling a `RunIngestion` that represents an
73//! update, is that it might fill out new internal state in the mid-level
74//! clients on the way toward being run.
75
76use std::cell::RefCell;
77use std::collections::{BTreeMap, BTreeSet, VecDeque};
78use std::path::PathBuf;
79use std::rc::Rc;
80use std::sync::Arc;
81use std::thread;
82use std::time::Duration;
83
84use fail::fail_point;
85use mz_ore::now::NowFn;
86use mz_ore::soft_assert_or_log;
87use mz_ore::tracing::TracingHandle;
88use mz_persist_client::batch::ProtoBatch;
89use mz_persist_client::cache::PersistClientCache;
90use mz_persist_client::operators::shard_source::ErrorHandler;
91use mz_repr::{GlobalId, Timestamp};
92use mz_rocksdb::config::SharedWriteBufferManager;
93use mz_storage_client::client::{
94 RunIngestionCommand, StatusUpdate, StorageCommand, StorageResponse,
95};
96use mz_storage_types::AlterCompatible;
97use mz_storage_types::configuration::StorageConfiguration;
98use mz_storage_types::connections::ConnectionContext;
99use mz_storage_types::controller::CollectionMetadata;
100use mz_storage_types::dyncfgs::STORAGE_SERVER_MAINTENANCE_INTERVAL;
101use mz_storage_types::oneshot_sources::OneshotIngestionDescription;
102use mz_storage_types::sinks::StorageSinkDesc;
103use mz_storage_types::sources::IngestionDescription;
104use mz_timely_util::builder_async::PressOnDropButton;
105use mz_txn_wal::operator::TxnsContext;
106use timely::order::PartialOrder;
107use timely::progress::Timestamp as _;
108use timely::progress::frontier::Antichain;
109use timely::worker::Worker as TimelyWorker;
110use tokio::sync::mpsc::error::TryRecvError;
111use tokio::sync::{mpsc, watch};
112use tokio::time::Instant;
113use tracing::{debug, info, warn};
114use uuid::Uuid;
115
116use crate::internal_control::{
117 self, DataflowParameters, InternalCommandReceiver, InternalCommandSender,
118 InternalStorageCommand,
119};
120use crate::metrics::StorageMetrics;
121use crate::statistics::{AggregatedStatistics, SinkStatistics, SourceStatistics};
122use crate::storage_state::async_storage_worker::{AsyncStorageWorker, AsyncStorageWorkerResponse};
123
124pub mod async_storage_worker;
125
126type CommandReceiver = mpsc::UnboundedReceiver<StorageCommand>;
127type ResponseSender = mpsc::UnboundedSender<StorageResponse>;
128
129/// State maintained for each worker thread.
130///
131/// Much of this state can be viewed as local variables for the worker thread,
132/// holding state that persists across function calls.
133pub struct Worker<'w> {
134 /// The underlying Timely worker.
135 ///
136 /// NOTE: This is `pub` for testing.
137 pub timely_worker: &'w mut TimelyWorker,
138 /// The channel over which communication handles for newly connected clients
139 /// are delivered.
140 pub client_rx: mpsc::UnboundedReceiver<(Uuid, CommandReceiver, ResponseSender)>,
141 /// The state associated with collection ingress and egress.
142 pub storage_state: StorageState,
143}
144
145impl<'w> Worker<'w> {
146 /// Creates new `Worker` state from the given components.
147 pub fn new(
148 timely_worker: &'w mut TimelyWorker,
149 client_rx: mpsc::UnboundedReceiver<(Uuid, CommandReceiver, ResponseSender)>,
150 metrics: StorageMetrics,
151 now: NowFn,
152 connection_context: ConnectionContext,
153 instance_context: StorageInstanceContext,
154 persist_clients: Arc<PersistClientCache>,
155 txns_ctx: TxnsContext,
156 tracing_handle: Arc<TracingHandle>,
157 shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
158 ) -> Self {
159 // It is very important that we only create the internal control
160 // flow/command sequencer once because a) the worker state is re-used
161 // when a new client connects and b) dataflows that have already been
162 // rendered into the timely worker are reused as well.
163 //
164 // If we created a new sequencer every time we get a new client (likely
165 // because the controller re-started and re-connected), dataflows that
166 // were rendered before would still hold a handle to the old sequencer
167 // but we would not read their commands anymore.
168 let (internal_cmd_tx, internal_cmd_rx) =
169 internal_control::setup_command_sequencer(timely_worker);
170
171 let storage_configuration =
172 StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs());
173
174 // We always initialize as read_only=true. Only when we're explicitly
175 // allowed do we switch to doing writes.
176 let (read_only_tx, read_only_rx) = watch::channel(true);
177
178 // Similar to the internal command sequencer, it is very important that
179 // we only create the async worker once because a) the worker state is
180 // re-used when a new client connects and b) commands that have already
181 // been sent and might yield a response will be lost if a new iteration
182 // of `run_client` creates a new async worker.
183 //
184 // If we created a new async worker every time we get a new client
185 // (likely because the controller re-started and re-connected), we can
186 // get into an inconsistent state where we think that a dataflow has
187 // been rendered, for example because there is an entry in
188 // `StorageState::ingestions`, while there is not yet a dataflow. This
189 // happens because the dataflow only gets rendered once we get a
190 // response from the async worker and send off an internal command.
191 //
192 // The core idea is that both the sequencer and the async worker are
193 // part of the per-worker state, and must be treated as such, meaning
194 // they must survive between invocations of `run_client`.
195
196 // TODO(aljoscha): This thread unparking business seems brittle, but that's
197 // also how the command channel works currently. We can wrap it inside a
198 // struct that holds both a channel and a `Thread`, but I don't
199 // think that would help too much.
200 let async_worker = async_storage_worker::AsyncStorageWorker::new(
201 thread::current(),
202 Arc::clone(&persist_clients),
203 );
204 let cluster_memory_limit = instance_context.cluster_memory_limit;
205
206 let storage_state = StorageState {
207 source_uppers: BTreeMap::new(),
208 source_tokens: BTreeMap::new(),
209 metrics,
210 reported_frontiers: BTreeMap::new(),
211 ingestions: BTreeMap::new(),
212 exports: BTreeMap::new(),
213 oneshot_ingestions: BTreeMap::new(),
214 now,
215 timely_worker_index: timely_worker.index(),
216 timely_worker_peers: timely_worker.peers(),
217 instance_context,
218 persist_clients,
219 txns_ctx,
220 sink_tokens: BTreeMap::new(),
221 sink_write_frontiers: BTreeMap::new(),
222 dropped_ids: Vec::new(),
223 aggregated_statistics: AggregatedStatistics::new(
224 timely_worker.index(),
225 timely_worker.peers(),
226 ),
227 shared_status_updates: Default::default(),
228 latest_status_updates: Default::default(),
229 initial_status_reported: Default::default(),
230 internal_cmd_tx,
231 internal_cmd_rx,
232 read_only_tx,
233 read_only_rx,
234 async_worker,
235 storage_configuration,
236 dataflow_parameters: DataflowParameters::new(
237 shared_rocksdb_write_buffer_manager,
238 cluster_memory_limit,
239 ),
240 tracing_handle,
241 server_maintenance_interval: Duration::ZERO,
242 };
243
244 // TODO(aljoscha): We might want `async_worker` and `internal_cmd_tx` to
245 // be fields of `Worker` instead of `StorageState`, but at least for the
246 // command flow sources and sinks need access to that. We can refactor
247 // this once we have a clearer boundary between what sources/sinks need
248 // and the full "power" of the internal command flow, which should stay
249 // internal to the worker/not be exposed to source/sink implementations.
250 Self {
251 timely_worker,
252 client_rx,
253 storage_state,
254 }
255 }
256}
257
258/// Worker-local state related to the ingress or egress of collections of data.
259pub struct StorageState {
260 /// The highest observed upper frontier for collection.
261 ///
262 /// This is shared among all source instances, so that they can jointly advance the
263 /// frontier even as other instances are created and dropped. Ideally, the Storage
264 /// module would eventually provide one source of truth on this rather than multiple,
265 /// and we should aim for that but are not there yet.
266 pub source_uppers: BTreeMap<GlobalId, Rc<RefCell<Antichain<mz_repr::Timestamp>>>>,
267 /// Handles to created sources, keyed by ID
268 /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
269 /// to prevent usage of custom shutdown tokens that are tricky to get right.
270 pub source_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
271 /// Metrics for storage objects.
272 pub metrics: StorageMetrics,
273 /// Tracks the conditional write frontiers we have reported.
274 pub reported_frontiers: BTreeMap<GlobalId, Antichain<Timestamp>>,
275 /// Descriptions of each installed ingestion.
276 pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
277 /// Descriptions of each installed export.
278 pub exports: BTreeMap<GlobalId, StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>,
279 /// Descriptions of oneshot ingestions that are currently running.
280 pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
281 /// Undocumented
282 pub now: NowFn,
283 /// Index of the associated timely dataflow worker.
284 pub timely_worker_index: usize,
285 /// Peers in the associated timely dataflow worker.
286 pub timely_worker_peers: usize,
287 /// Other configuration for sources and sinks.
288 pub instance_context: StorageInstanceContext,
289 /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
290 /// This is intentionally shared between workers
291 pub persist_clients: Arc<PersistClientCache>,
292 /// Context necessary for rendering txn-wal operators.
293 pub txns_ctx: TxnsContext,
294 /// Tokens that should be dropped when a dataflow is dropped to clean up
295 /// associated state.
296 /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
297 /// to prevent usage of custom shutdown tokens that are tricky to get right.
298 pub sink_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
299 /// Frontier of sink writes (all subsequent writes will be at times at or
300 /// equal to this frontier)
301 pub sink_write_frontiers: BTreeMap<GlobalId, Rc<RefCell<Antichain<Timestamp>>>>,
302 /// Collection ids that have been dropped but not yet reported as dropped
303 pub dropped_ids: Vec<GlobalId>,
304
305 /// Statistics for sources and sinks.
306 pub aggregated_statistics: AggregatedStatistics,
307
308 /// A place shared with running dataflows, so that health operators, can
309 /// report status updates back to us.
310 ///
311 /// **NOTE**: Operators that append to this collection should take care to only add new
312 /// status updates if the status of the ingestion/export in question has _changed_.
313 pub shared_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,
314
315 /// The latest status update for each object.
316 pub latest_status_updates: BTreeMap<GlobalId, StatusUpdate>,
317
318 /// Whether we have reported the initial status after connecting to a new client.
319 /// This is reset to false when a new client connects.
320 pub initial_status_reported: bool,
321
322 /// Sender for cluster-internal storage commands. These can be sent from
323 /// within workers/operators and will be distributed to all workers. For
324 /// example, for shutting down an entire dataflow from within a
325 /// operator/worker.
326 pub internal_cmd_tx: InternalCommandSender,
327 /// Receiver for cluster-internal storage commands.
328 pub internal_cmd_rx: InternalCommandReceiver,
329
330 /// When this replica/cluster is in read-only mode it must not affect any
331 /// changes to external state. This flag can only be changed by a
332 /// [StorageCommand::AllowWrites].
333 ///
334 /// Everything running on this replica/cluster must obey this flag. At the
335 /// time of writing, nothing currently looks at this flag.
336 /// TODO(benesch): fix this.
337 ///
338 /// NOTE: In the future, we might want a more complicated flag, for example
339 /// something that tells us after which timestamp we are allowed to write.
340 /// In this first version we are keeping things as simple as possible!
341 pub read_only_rx: watch::Receiver<bool>,
342
343 /// Send-side for read-only state.
344 pub read_only_tx: watch::Sender<bool>,
345
346 /// Async worker companion, used for running code that requires async, which
347 /// the timely main loop cannot do.
348 pub async_worker: AsyncStorageWorker<mz_repr::Timestamp>,
349
350 /// Configuration for source and sink connections.
351 pub storage_configuration: StorageConfiguration,
352 /// Dynamically configurable parameters that control how dataflows are rendered.
353 /// NOTE(guswynn): we should consider moving these into `storage_configuration`.
354 pub dataflow_parameters: DataflowParameters,
355
356 /// A process-global handle to tracing configuration.
357 pub tracing_handle: Arc<TracingHandle>,
358
359 /// Interval at which to perform server maintenance tasks. Set to a zero interval to
360 /// perform maintenance with every `step_or_park` invocation.
361 pub server_maintenance_interval: Duration,
362}
363
364impl StorageState {
365 /// Return an error handler that triggers a suspend and restart of the corresponding storage
366 /// dataflow.
367 pub fn error_handler(&self, context: &'static str, id: GlobalId) -> ErrorHandler {
368 let tx = self.internal_cmd_tx.clone();
369 ErrorHandler::signal(move |e| {
370 tx.send(InternalStorageCommand::SuspendAndRestart {
371 id,
372 reason: format!("{context}: {e:#}"),
373 })
374 })
375 }
376}
377
378/// Extra context for a storage instance.
379/// This is extra information that is used when rendering source
380/// and sinks that is not tied to the source/connection configuration itself.
381#[derive(Clone)]
382pub struct StorageInstanceContext {
383 /// A directory that can be used for scratch work.
384 pub scratch_directory: Option<PathBuf>,
385 /// A global `rocksdb::Env`, shared across ALL instances of `RocksDB` (even
386 /// across sources!). This `Env` lets us control some resources (like background threads)
387 /// process-wide.
388 pub rocksdb_env: rocksdb::Env,
389 /// The memory limit of the materialize cluster replica. This will
390 /// be used to calculate and configure the maximum inflight bytes for backpressure
391 pub cluster_memory_limit: Option<usize>,
392}
393
394impl StorageInstanceContext {
395 /// Build a new `StorageInstanceContext`.
396 pub fn new(
397 scratch_directory: Option<PathBuf>,
398 cluster_memory_limit: Option<usize>,
399 ) -> Result<Self, anyhow::Error> {
400 // If no file system is available, fall back to running RocksDB in memory.
401 let rocksdb_env = if scratch_directory.is_some() {
402 rocksdb::Env::new()?
403 } else {
404 rocksdb::Env::mem_env()?
405 };
406
407 Ok(Self {
408 scratch_directory,
409 rocksdb_env,
410 cluster_memory_limit,
411 })
412 }
413}
414
415impl<'w> Worker<'w> {
416 /// Waits for client connections and runs them to completion.
417 pub fn run(&mut self) {
418 while let Some((_nonce, rx, tx)) = self.client_rx.blocking_recv() {
419 self.run_client(rx, tx);
420 }
421 }
422
423 /// Runs this (timely) storage worker until the given `command_rx` is
424 /// disconnected.
425 ///
426 /// See the [module documentation](crate::storage_state) for this
427 /// workers responsibilities, how it communicates with the other workers and
428 /// how commands flow from the controller and through the workers.
429 fn run_client(&mut self, mut command_rx: CommandReceiver, response_tx: ResponseSender) {
430 // At this point, all workers are still reading from the command flow.
431 if self.reconcile(&mut command_rx).is_err() {
432 return;
433 }
434
435 // The last time we reported statistics.
436 let mut last_stats_time = Instant::now();
437
438 // The last time we did periodic maintenance.
439 let mut last_maintenance = std::time::Instant::now();
440
441 let mut disconnected = false;
442 while !disconnected {
443 let config = &self.storage_state.storage_configuration;
444 let stats_interval = config.parameters.statistics_collection_interval;
445
446 let maintenance_interval = self.storage_state.server_maintenance_interval;
447
448 let now = std::time::Instant::now();
449 // Determine if we need to perform maintenance, which is true if `maintenance_interval`
450 // time has passed since the last maintenance.
451 let sleep_duration;
452 if now >= last_maintenance + maintenance_interval {
453 last_maintenance = now;
454 sleep_duration = None;
455
456 self.report_frontier_progress(&response_tx);
457 } else {
458 // We didn't perform maintenance, sleep until the next maintenance interval.
459 let next_maintenance = last_maintenance + maintenance_interval;
460 sleep_duration = Some(next_maintenance.saturating_duration_since(now))
461 }
462
463 // Ask Timely to execute a unit of work.
464 //
465 // If there are no pending commands or responses from the async
466 // worker, we ask Timely to park the thread if there's nothing to
467 // do. We rely on another thread unparking us when there's new work
468 // to be done, e.g., when sending a command or when new Kafka
469 // messages have arrived.
470 //
471 // It is critical that we allow Timely to park iff there are no
472 // pending commands or responses. The command may have already been
473 // consumed by the call to `client_rx.recv`. See:
474 // https://github.com/MaterializeInc/materialize/pull/13973#issuecomment-1200312212
475 if command_rx.is_empty() && self.storage_state.async_worker.is_empty() {
476 // Make sure we wake up again to report any pending statistics updates.
477 let mut park_duration = stats_interval.saturating_sub(last_stats_time.elapsed());
478 if let Some(sleep_duration) = sleep_duration {
479 park_duration = std::cmp::min(sleep_duration, park_duration);
480 }
481 self.timely_worker.step_or_park(Some(park_duration));
482 } else {
483 self.timely_worker.step();
484 }
485
486 // Rerport any dropped ids
487 for id in std::mem::take(&mut self.storage_state.dropped_ids) {
488 self.send_storage_response(&response_tx, StorageResponse::DroppedId(id));
489 }
490
491 self.process_oneshot_ingestions(&response_tx);
492
493 self.report_status_updates(&response_tx);
494
495 if last_stats_time.elapsed() >= stats_interval {
496 self.report_storage_statistics(&response_tx);
497 last_stats_time = Instant::now();
498 }
499
500 // Handle any received commands.
501 loop {
502 match command_rx.try_recv() {
503 Ok(cmd) => self.storage_state.handle_storage_command(cmd),
504 Err(TryRecvError::Empty) => break,
505 Err(TryRecvError::Disconnected) => {
506 disconnected = true;
507 break;
508 }
509 }
510 }
511
512 // Handle responses from the async worker.
513 while let Ok(response) = self.storage_state.async_worker.try_recv() {
514 self.handle_async_worker_response(response);
515 }
516
517 // Handle any received commands.
518 while let Some(command) = self.storage_state.internal_cmd_rx.try_recv() {
519 self.handle_internal_storage_command(command);
520 }
521 }
522 }
523
524 /// Entry point for applying a response from the async storage worker.
525 pub fn handle_async_worker_response(
526 &self,
527 async_response: AsyncStorageWorkerResponse<mz_repr::Timestamp>,
528 ) {
529 // NOTE: If we want to share the load of async processing we
530 // have to change `handle_storage_command` and change this
531 // assert.
532 assert_eq!(
533 self.timely_worker.index(),
534 0,
535 "only worker #0 is doing async processing"
536 );
537 match async_response {
538 AsyncStorageWorkerResponse::IngestionFrontiersUpdated {
539 id,
540 ingestion_description,
541 as_of,
542 resume_uppers,
543 source_resume_uppers,
544 } => {
545 self.storage_state.internal_cmd_tx.send(
546 InternalStorageCommand::CreateIngestionDataflow {
547 id,
548 ingestion_description,
549 as_of,
550 resume_uppers,
551 source_resume_uppers,
552 },
553 );
554 }
555 AsyncStorageWorkerResponse::ExportFrontiersUpdated { id, description } => {
556 self.storage_state
557 .internal_cmd_tx
558 .send(InternalStorageCommand::RunSinkDataflow(id, description));
559 }
560 AsyncStorageWorkerResponse::DropDataflow(id) => {
561 self.storage_state
562 .internal_cmd_tx
563 .send(InternalStorageCommand::DropDataflow(vec![id]));
564 }
565 }
566 }
567
568 /// Entry point for applying an internal storage command.
569 pub fn handle_internal_storage_command(&mut self, internal_cmd: InternalStorageCommand) {
570 match internal_cmd {
571 InternalStorageCommand::SuspendAndRestart { id, reason } => {
572 info!(
573 "worker {}/{} initiating suspend-and-restart for {id} because of: {reason}",
574 self.timely_worker.index(),
575 self.timely_worker.peers(),
576 );
577
578 let maybe_ingestion = self.storage_state.ingestions.get(&id).cloned();
579 if let Some(ingestion_description) = maybe_ingestion {
580 // Yank the token of the previously existing source dataflow.Note that this
581 // token also includes any source exports/subsources.
582 let maybe_token = self.storage_state.source_tokens.remove(&id);
583 if maybe_token.is_none() {
584 // Something has dropped the source. Make sure we don't
585 // accidentally re-create it.
586 return;
587 }
588
589 // This needs to be done by one worker, which will
590 // broadcasts a `CreateIngestionDataflow` command to all
591 // workers based on the response that contains the
592 // resumption upper.
593 //
594 // Doing this separately on each worker could lead to
595 // differing resume_uppers which might lead to all kinds of
596 // mayhem.
597 //
598 // TODO(aljoscha): If we ever become worried that this is
599 // putting undue pressure on worker 0 we can pick the
600 // designated worker for a source/sink based on `id.hash()`.
601 if self.timely_worker.index() == 0 {
602 for (id, _) in ingestion_description.source_exports.iter() {
603 self.storage_state
604 .aggregated_statistics
605 .advance_global_epoch(*id);
606 }
607 self.storage_state
608 .async_worker
609 .update_ingestion_frontiers(id, ingestion_description);
610 }
611
612 // Continue with other commands.
613 return;
614 }
615
616 let maybe_sink = self.storage_state.exports.get(&id).cloned();
617 if let Some(sink_description) = maybe_sink {
618 // Yank the token of the previously existing sink
619 // dataflow.
620 let maybe_token = self.storage_state.sink_tokens.remove(&id);
621
622 if maybe_token.is_none() {
623 // Something has dropped the sink. Make sure we don't
624 // accidentally re-create it.
625 return;
626 }
627
628 // This needs to be broadcast by one worker and go through
629 // the internal command fabric, to ensure consistent
630 // ordering of dataflow rendering across all workers.
631 if self.timely_worker.index() == 0 {
632 self.storage_state
633 .aggregated_statistics
634 .advance_global_epoch(id);
635 self.storage_state
636 .async_worker
637 .update_sink_frontiers(id, sink_description);
638 }
639
640 // Continue with other commands.
641 return;
642 }
643
644 if !self
645 .storage_state
646 .ingestions
647 .values()
648 .any(|v| v.source_exports.contains_key(&id))
649 {
650 // Our current approach to dropping a source results in a race between shard
651 // finalization (which happens in the controller) and dataflow shutdown (which
652 // happens in clusterd). If a source is created and dropped fast enough -or the
653 // two commands get sufficiently delayed- then it's possible to receive a
654 // SuspendAndRestart command for an unknown source. We cannot assert that this
655 // never happens but we log an error here to track how often this happens.
656 warn!(
657 "got InternalStorageCommand::SuspendAndRestart for something that is not a source or sink: {id}"
658 );
659 }
660 }
661 InternalStorageCommand::CreateIngestionDataflow {
662 id: ingestion_id,
663 mut ingestion_description,
664 as_of,
665 mut resume_uppers,
666 mut source_resume_uppers,
667 } => {
668 info!(
669 ?as_of,
670 ?resume_uppers,
671 "worker {}/{} trying to (re-)start ingestion {ingestion_id}",
672 self.timely_worker.index(),
673 self.timely_worker.peers(),
674 );
675
676 // We initialize statistics before we prune finished exports. We
677 // still want to export statistics for these, plus the rendering
678 // machinery will get confused if there are not at least
679 // statistics for the "main" source.
680 for (export_id, export) in ingestion_description.source_exports.iter() {
681 let resume_upper = resume_uppers[export_id].clone();
682 self.storage_state.aggregated_statistics.initialize_source(
683 *export_id,
684 ingestion_id,
685 resume_upper.clone(),
686 || {
687 SourceStatistics::new(
688 *export_id,
689 self.storage_state.timely_worker_index,
690 &self.storage_state.metrics.source_statistics,
691 ingestion_id,
692 &export.storage_metadata.data_shard,
693 export.data_config.envelope.clone(),
694 resume_upper,
695 )
696 },
697 );
698 }
699
700 let finished_exports: BTreeSet<GlobalId> = resume_uppers
701 .iter()
702 .filter(|(_, frontier)| frontier.is_empty())
703 .map(|(id, _)| *id)
704 .collect();
705
706 resume_uppers.retain(|id, _| !finished_exports.contains(id));
707 source_resume_uppers.retain(|id, _| !finished_exports.contains(id));
708 ingestion_description
709 .source_exports
710 .retain(|id, _| !finished_exports.contains(id));
711
712 for id in ingestion_description.collection_ids() {
713 // If there is already a shared upper, we re-use it, to make
714 // sure that parties that are already using the shared upper
715 // can continue doing so.
716 let source_upper = self
717 .storage_state
718 .source_uppers
719 .entry(id.clone())
720 .or_insert_with(|| {
721 Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())))
722 });
723
724 let mut source_upper = source_upper.borrow_mut();
725 if !source_upper.is_empty() {
726 source_upper.clear();
727 source_upper.insert(mz_repr::Timestamp::minimum());
728 }
729 }
730
731 // If all subsources of the source are finished, we can skip rendering entirely.
732 // Also, if `as_of` is empty, the dataflow has been finalized, so we can skip it as
733 // well.
734 //
735 // TODO(guswynn|petrosagg): this is a bit hacky, and is a consequence of storage state
736 // management being a bit of a mess. we should clean this up and remove weird if
737 // statements like this.
738 if resume_uppers.values().all(|frontier| frontier.is_empty()) || as_of.is_empty() {
739 info!(
740 ?resume_uppers,
741 ?as_of,
742 "worker {}/{} skipping building ingestion dataflow \
743 for {ingestion_id} because the ingestion is finished",
744 self.timely_worker.index(),
745 self.timely_worker.peers(),
746 );
747 return;
748 }
749
750 crate::render::build_ingestion_dataflow(
751 self.timely_worker,
752 &mut self.storage_state,
753 ingestion_id,
754 ingestion_description,
755 as_of,
756 resume_uppers,
757 source_resume_uppers,
758 );
759 }
760 InternalStorageCommand::RunOneshotIngestion {
761 ingestion_id,
762 collection_id,
763 collection_meta,
764 request,
765 } => {
766 crate::render::build_oneshot_ingestion_dataflow(
767 self.timely_worker,
768 &mut self.storage_state,
769 ingestion_id,
770 collection_id,
771 collection_meta,
772 request,
773 );
774 }
775 InternalStorageCommand::RunSinkDataflow(sink_id, sink_description) => {
776 info!(
777 "worker {}/{} trying to (re-)start sink {sink_id}",
778 self.timely_worker.index(),
779 self.timely_worker.peers(),
780 );
781
782 {
783 // If there is already a shared write frontier, we re-use it, to
784 // make sure that parties that are already using the shared
785 // frontier can continue doing so.
786 let sink_write_frontier = self
787 .storage_state
788 .sink_write_frontiers
789 .entry(sink_id.clone())
790 .or_insert_with(|| Rc::new(RefCell::new(Antichain::new())));
791
792 let mut sink_write_frontier = sink_write_frontier.borrow_mut();
793 sink_write_frontier.clear();
794 sink_write_frontier.insert(mz_repr::Timestamp::minimum());
795 }
796 self.storage_state
797 .aggregated_statistics
798 .initialize_sink(sink_id, || {
799 SinkStatistics::new(
800 sink_id,
801 self.storage_state.timely_worker_index,
802 &self.storage_state.metrics.sink_statistics,
803 )
804 });
805
806 crate::render::build_export_dataflow(
807 self.timely_worker,
808 &mut self.storage_state,
809 sink_id,
810 sink_description,
811 );
812 }
813 InternalStorageCommand::DropDataflow(ids) => {
814 for id in &ids {
815 // Clean up per-source / per-sink state.
816 self.storage_state.source_uppers.remove(id);
817 self.storage_state.source_tokens.remove(id);
818
819 self.storage_state.sink_tokens.remove(id);
820 self.storage_state.sink_write_frontiers.remove(id);
821
822 self.storage_state.aggregated_statistics.deinitialize(*id);
823 }
824 }
825 InternalStorageCommand::UpdateConfiguration { storage_parameters } => {
826 self.storage_state
827 .dataflow_parameters
828 .update(storage_parameters.clone());
829 self.storage_state
830 .storage_configuration
831 .update(storage_parameters);
832
833 // Clear out the updates as we no longer forward them to anyone else to process.
834 // We clone `StorageState::storage_configuration` many times during rendering
835 // and want to avoid cloning these unused updates.
836 self.storage_state
837 .storage_configuration
838 .parameters
839 .dyncfg_updates = Default::default();
840
841 // Remember the maintenance interval locally to avoid reading it from the config set on
842 // every server iteration.
843 self.storage_state.server_maintenance_interval =
844 STORAGE_SERVER_MAINTENANCE_INTERVAL
845 .get(self.storage_state.storage_configuration.config_set());
846 }
847 InternalStorageCommand::StatisticsUpdate { sources, sinks } => self
848 .storage_state
849 .aggregated_statistics
850 .ingest(sources, sinks),
851 }
852 }
853
854 /// Emit information about write frontier progress, along with information that should
855 /// be made durable for this to be the case.
856 ///
857 /// The write frontier progress is "conditional" in that it is not until the information is made
858 /// durable that the data are emitted to downstream workers, and indeed they should not rely on
859 /// the completeness of what they hear until the information is made durable.
860 ///
861 /// Specifically, this sends information about new timestamp bindings created by dataflow workers,
862 /// with the understanding if that if made durable (and ack'd back to the workers) the source will
863 /// in fact progress with this write frontier.
864 pub fn report_frontier_progress(&mut self, response_tx: &ResponseSender) {
865 let mut new_uppers = Vec::new();
866
867 // Check if any observed frontier should advance the reported frontiers.
868 for (id, frontier) in self
869 .storage_state
870 .source_uppers
871 .iter()
872 .chain(self.storage_state.sink_write_frontiers.iter())
873 {
874 let Some(reported_frontier) = self.storage_state.reported_frontiers.get_mut(id) else {
875 // Frontier reporting has not yet been started for this object.
876 // Potentially because this timely worker has not yet seen the
877 // `CreateSources` command.
878 continue;
879 };
880
881 let observed_frontier = frontier.borrow();
882
883 // Only do a thing if it *advances* the frontier, not just *changes* the frontier.
884 // This is protection against `frontier` lagging behind what we have conditionally reported.
885 if PartialOrder::less_than(reported_frontier, &observed_frontier) {
886 new_uppers.push((*id, observed_frontier.clone()));
887 reported_frontier.clone_from(&observed_frontier);
888 }
889 }
890
891 for (id, upper) in new_uppers {
892 self.send_storage_response(response_tx, StorageResponse::FrontierUpper(id, upper));
893 }
894 }
895
896 /// Pumps latest status updates from the buffer shared with operators and
897 /// reports any updates that need reporting.
898 pub fn report_status_updates(&mut self, response_tx: &ResponseSender) {
899 // If we haven't done the initial status report, report all current statuses
900 if !self.storage_state.initial_status_reported {
901 // We pull initially reported status updates to "now", so that they
902 // sort as the latest update in internal status collections. This
903 // makes it so that a newly bootstrapped envd can append status
904 // updates to internal status collections that report an accurate
905 // view as of the time when they came up.
906 let now_ts = mz_ore::now::to_datetime((self.storage_state.now)());
907 let status_updates = self
908 .storage_state
909 .latest_status_updates
910 .values()
911 .cloned()
912 .map(|mut update| {
913 update.timestamp = now_ts.clone();
914 update
915 });
916 for update in status_updates {
917 self.send_storage_response(response_tx, StorageResponse::StatusUpdate(update));
918 }
919 self.storage_state.initial_status_reported = true;
920 }
921
922 // Pump updates into our state and stage them for reporting.
923 for shared_update in self.storage_state.shared_status_updates.take() {
924 self.send_storage_response(
925 response_tx,
926 StorageResponse::StatusUpdate(shared_update.clone()),
927 );
928
929 self.storage_state
930 .latest_status_updates
931 .insert(shared_update.id, shared_update);
932 }
933 }
934
935 /// Report source statistics back to the controller.
936 pub fn report_storage_statistics(&mut self, response_tx: &ResponseSender) {
937 let (sources, sinks) = self.storage_state.aggregated_statistics.emit_local();
938 if !sources.is_empty() || !sinks.is_empty() {
939 self.storage_state
940 .internal_cmd_tx
941 .send(InternalStorageCommand::StatisticsUpdate { sources, sinks })
942 }
943
944 let (sources, sinks) = self.storage_state.aggregated_statistics.snapshot();
945 if !sources.is_empty() || !sinks.is_empty() {
946 self.send_storage_response(
947 response_tx,
948 StorageResponse::StatisticsUpdates(sources, sinks),
949 );
950 }
951 }
952
953 /// Send a response to the coordinator.
954 fn send_storage_response(&self, response_tx: &ResponseSender, response: StorageResponse) {
955 // Ignore send errors because the coordinator is free to ignore our
956 // responses. This happens during shutdown.
957 let _ = response_tx.send(response);
958 }
959
960 fn process_oneshot_ingestions(&mut self, response_tx: &ResponseSender) {
961 for (ingestion_id, ingestion_state) in &mut self.storage_state.oneshot_ingestions {
962 loop {
963 match ingestion_state.results.try_recv() {
964 Ok(result) => {
965 let response = match result {
966 Ok(maybe_batch) => maybe_batch.into_iter().map(Result::Ok).collect(),
967 Err(err) => vec![Err(err)],
968 };
969 let staged_batches = BTreeMap::from([(*ingestion_id, response)]);
970 let _ = response_tx.send(StorageResponse::StagedBatches(staged_batches));
971 }
972 Err(TryRecvError::Empty) => {
973 break;
974 }
975 Err(TryRecvError::Disconnected) => {
976 break;
977 }
978 }
979 }
980 }
981 }
982
983 /// Extract commands until `InitializationComplete`, and make the worker
984 /// reflect those commands. If the worker can not be made to reflect the
985 /// commands, return an error.
986 fn reconcile(&mut self, command_rx: &mut CommandReceiver) -> Result<(), ()> {
987 let worker_id = self.timely_worker.index();
988
989 // To initialize the connection, we want to drain all commands until we
990 // receive a `StorageCommand::InitializationComplete` command to form a
991 // target command state.
992 let mut commands = vec![];
993 loop {
994 match command_rx.blocking_recv().ok_or(())? {
995 StorageCommand::InitializationComplete => break,
996 command => commands.push(command),
997 }
998 }
999
1000 // Track which frontiers this envd expects; we will also set their
1001 // initial timestamp to the minimum timestamp to reset them as we don't
1002 // know what frontiers the new envd expects.
1003 let mut expected_objects = BTreeSet::new();
1004
1005 let mut drop_commands = BTreeSet::new();
1006 let mut running_ingestion_descriptions = self.storage_state.ingestions.clone();
1007 let mut running_exports_descriptions = self.storage_state.exports.clone();
1008
1009 let mut create_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1010 let mut cancel_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1011
1012 for command in &mut commands {
1013 match command {
1014 StorageCommand::Hello { .. } => {
1015 panic!("Hello must be captured before")
1016 }
1017 StorageCommand::AllowCompaction(id, since) => {
1018 info!(%worker_id, ?id, ?since, "reconcile: received AllowCompaction command");
1019
1020 // collect all "drop commands". These are `AllowCompaction`
1021 // commands that compact to the empty since. Then, later, we make sure
1022 // we retain only those `Create*` commands that are not dropped. We
1023 // assume that the `AllowCompaction` command is ordered after the
1024 // `Create*` commands but don't assert that.
1025 // WIP: Should we assert?
1026 if since.is_empty() {
1027 drop_commands.insert(*id);
1028 }
1029 }
1030 StorageCommand::RunIngestion(ingestion) => {
1031 info!(%worker_id, ?ingestion, "reconcile: received RunIngestion command");
1032
1033 // Ensure that ingestions are forward-rolling alter compatible.
1034 let prev = running_ingestion_descriptions
1035 .insert(ingestion.id, ingestion.description.clone());
1036
1037 if let Some(prev_ingest) = prev {
1038 // If the new ingestion is not exactly equal to the currently running
1039 // ingestion, we must either track that we need to synthesize an update
1040 // command to change the ingestion, or panic.
1041 prev_ingest
1042 .alter_compatible(ingestion.id, &ingestion.description)
1043 .expect("only alter compatible ingestions permitted");
1044 }
1045 }
1046 StorageCommand::RunSink(export) => {
1047 info!(%worker_id, ?export, "reconcile: received RunSink command");
1048
1049 // Ensure that exports are forward-rolling alter compatible.
1050 let prev =
1051 running_exports_descriptions.insert(export.id, export.description.clone());
1052
1053 if let Some(prev_export) = prev {
1054 prev_export
1055 .alter_compatible(export.id, &export.description)
1056 .expect("only alter compatible exports permitted");
1057 }
1058 }
1059 StorageCommand::RunOneshotIngestion(ingestion) => {
1060 info!(%worker_id, ?ingestion, "reconcile: received RunOneshotIngestion command");
1061 create_oneshot_ingestions.insert(ingestion.ingestion_id);
1062 }
1063 StorageCommand::CancelOneshotIngestion(uuid) => {
1064 info!(%worker_id, %uuid, "reconcile: received CancelOneshotIngestion command");
1065 cancel_oneshot_ingestions.insert(*uuid);
1066 }
1067 StorageCommand::InitializationComplete
1068 | StorageCommand::AllowWrites
1069 | StorageCommand::UpdateConfiguration(_) => (),
1070 }
1071 }
1072
1073 let mut seen_most_recent_definition = BTreeSet::new();
1074
1075 // We iterate over this backward to ensure that we keep only the most recent ingestion
1076 // description.
1077 let mut filtered_commands = VecDeque::new();
1078 for mut command in commands.into_iter().rev() {
1079 let mut should_keep = true;
1080 match &mut command {
1081 StorageCommand::Hello { .. } => {
1082 panic!("Hello must be captured before")
1083 }
1084 StorageCommand::RunIngestion(ingestion) => {
1085 // Subsources can be dropped independently of their
1086 // primary source, so we evaluate them in a separate
1087 // loop.
1088 for export_id in ingestion
1089 .description
1090 .source_exports
1091 .keys()
1092 .filter(|export_id| **export_id != ingestion.id)
1093 {
1094 if drop_commands.remove(export_id) {
1095 info!(%worker_id, %export_id, "reconcile: dropping subsource");
1096 self.storage_state.dropped_ids.push(*export_id);
1097 }
1098 }
1099
1100 if drop_commands.remove(&ingestion.id)
1101 || self.storage_state.dropped_ids.contains(&ingestion.id)
1102 {
1103 info!(%worker_id, %ingestion.id, "reconcile: dropping ingestion");
1104
1105 // If an ingestion is dropped, so too must all of
1106 // its subsources (i.e. ingestion exports, as well
1107 // as its progress subsource).
1108 for id in ingestion.description.collection_ids() {
1109 drop_commands.remove(&id);
1110 self.storage_state.dropped_ids.push(id);
1111 }
1112 should_keep = false;
1113 } else {
1114 let most_recent_defintion =
1115 seen_most_recent_definition.insert(ingestion.id);
1116
1117 if most_recent_defintion {
1118 // If this is the most recent definition, this
1119 // is what we will be running when
1120 // reconciliation completes. This definition
1121 // must not include any dropped subsources.
1122 ingestion.description.source_exports.retain(|export_id, _| {
1123 !self.storage_state.dropped_ids.contains(export_id)
1124 });
1125
1126 // After clearing any dropped subsources, we can
1127 // state that we expect all of these to exist.
1128 expected_objects.extend(ingestion.description.collection_ids());
1129 }
1130
1131 let running_ingestion = self.storage_state.ingestions.get(&ingestion.id);
1132
1133 // We keep only:
1134 // - The most recent version of the ingestion, which
1135 // is why these commands are run in reverse.
1136 // - Ingestions whose descriptions are not exactly
1137 // those that are currently running.
1138 should_keep = most_recent_defintion
1139 && running_ingestion != Some(&ingestion.description)
1140 }
1141 }
1142 StorageCommand::RunSink(export) => {
1143 if drop_commands.remove(&export.id)
1144 // If there were multiple `RunSink` in the command
1145 // stream, we want to ensure none of them are
1146 // retained.
1147 || self.storage_state.dropped_ids.contains(&export.id)
1148 {
1149 info!(%worker_id, %export.id, "reconcile: dropping sink");
1150
1151 // Make sure that we report back that the ID was
1152 // dropped.
1153 self.storage_state.dropped_ids.push(export.id);
1154
1155 should_keep = false
1156 } else {
1157 expected_objects.insert(export.id);
1158
1159 let running_sink = self.storage_state.exports.get(&export.id);
1160
1161 // We keep only:
1162 // - The most recent version of the sink, which
1163 // is why these commands are run in reverse.
1164 // - Sinks whose descriptions are not exactly
1165 // those that are currently running.
1166 should_keep = seen_most_recent_definition.insert(export.id)
1167 && running_sink != Some(&export.description);
1168 }
1169 }
1170 StorageCommand::RunOneshotIngestion(ingestion) => {
1171 let already_running = self
1172 .storage_state
1173 .oneshot_ingestions
1174 .contains_key(&ingestion.ingestion_id);
1175 let was_canceled = cancel_oneshot_ingestions.contains(&ingestion.ingestion_id);
1176
1177 should_keep = !already_running && !was_canceled;
1178 }
1179 StorageCommand::CancelOneshotIngestion(ingestion_id) => {
1180 let already_running = self
1181 .storage_state
1182 .oneshot_ingestions
1183 .contains_key(ingestion_id);
1184 should_keep = already_running;
1185 }
1186 StorageCommand::InitializationComplete
1187 | StorageCommand::AllowWrites
1188 | StorageCommand::UpdateConfiguration(_)
1189 | StorageCommand::AllowCompaction(_, _) => (),
1190 }
1191 if should_keep {
1192 filtered_commands.push_front(command);
1193 }
1194 }
1195 let commands = filtered_commands;
1196
1197 // Make sure all the "drop commands" matched up with a source or sink.
1198 // This is also what the regular handler logic for `AllowCompaction`
1199 // would do.
1200 soft_assert_or_log!(
1201 drop_commands.is_empty(),
1202 "AllowCompaction commands for non-existent IDs {:?}",
1203 drop_commands
1204 );
1205
1206 // Determine the ID of all objects we did _not_ see; these are
1207 // considered stale.
1208 let stale_objects = self
1209 .storage_state
1210 .ingestions
1211 .values()
1212 .map(|i| i.collection_ids())
1213 .flatten()
1214 .chain(self.storage_state.exports.keys().copied())
1215 // Objects are considered stale if we did not see them re-created.
1216 .filter(|id| !expected_objects.contains(id))
1217 .collect::<Vec<_>>();
1218 let stale_oneshot_ingestions = self
1219 .storage_state
1220 .oneshot_ingestions
1221 .keys()
1222 .filter(|ingestion_id| {
1223 let to_create = create_oneshot_ingestions.contains(ingestion_id);
1224 let to_drop = cancel_oneshot_ingestions.contains(ingestion_id);
1225 mz_ore::soft_assert_or_log!(
1226 !(!to_create && to_drop),
1227 "attempting to drop oneshot source {ingestion_id} that is not expected to be created during reconciliation"
1228 );
1229 !to_create && !to_drop
1230 })
1231 .copied()
1232 .collect::<Vec<_>>();
1233
1234 info!(
1235 %worker_id, ?expected_objects, ?stale_objects, ?stale_oneshot_ingestions,
1236 "reconcile: modifing storage state to match expected objects",
1237 );
1238
1239 for id in stale_objects {
1240 self.storage_state.drop_collection(id);
1241 }
1242 for id in stale_oneshot_ingestions {
1243 self.storage_state.drop_oneshot_ingestion(id);
1244 }
1245
1246 // Do not report dropping any objects that do not belong to expected
1247 // objects.
1248 self.storage_state
1249 .dropped_ids
1250 .retain(|id| expected_objects.contains(id));
1251
1252 // Do not report any frontiers that do not belong to expected objects.
1253 // Note that this set of objects can differ from the set of sources and
1254 // sinks.
1255 self.storage_state
1256 .reported_frontiers
1257 .retain(|id, _| expected_objects.contains(id));
1258
1259 // Reset the reported frontiers for the remaining objects.
1260 for (_, frontier) in &mut self.storage_state.reported_frontiers {
1261 *frontier = Antichain::from_elem(<_>::minimum());
1262 }
1263
1264 // Reset the initial status reported flag when a new client connects
1265 self.storage_state.initial_status_reported = false;
1266
1267 // Execute the modified commands.
1268 for command in commands {
1269 self.storage_state.handle_storage_command(command);
1270 }
1271
1272 Ok(())
1273 }
1274}
1275
1276impl StorageState {
1277 /// Entry point for applying a storage command.
1278 ///
1279 /// NOTE: This does not have access to the timely worker and therefore
1280 /// cannot render dataflows. For dataflow rendering, this needs to either
1281 /// send asynchronous command to the `async_worker` or internal
1282 /// commands to the `internal_cmd_tx`.
1283 pub fn handle_storage_command(&mut self, cmd: StorageCommand) {
1284 match cmd {
1285 StorageCommand::Hello { .. } => panic!("Hello must be captured before"),
1286 StorageCommand::InitializationComplete => (),
1287 StorageCommand::AllowWrites => {
1288 self.read_only_tx
1289 .send(false)
1290 .expect("we're holding one other end");
1291 self.persist_clients.cfg().enable_compaction();
1292 }
1293 StorageCommand::UpdateConfiguration(params) => {
1294 // These can be done from all workers safely.
1295 debug!("Applying configuration update: {params:?}");
1296
1297 // We serialize the dyncfg updates in StorageParameters, but configure
1298 // persist separately.
1299 self.persist_clients
1300 .cfg()
1301 .apply_from(¶ms.dyncfg_updates);
1302
1303 params.tracing.apply(self.tracing_handle.as_ref());
1304
1305 if let Some(log_filter) = ¶ms.tracing.log_filter {
1306 self.storage_configuration
1307 .connection_context
1308 .librdkafka_log_level =
1309 mz_ore::tracing::crate_level(&log_filter.clone().into(), "librdkafka");
1310 }
1311
1312 // This needs to be broadcast by one worker and go through
1313 // the internal command fabric, to ensure consistent
1314 // ordering of dataflow rendering across all workers.
1315 if self.timely_worker_index == 0 {
1316 self.internal_cmd_tx
1317 .send(InternalStorageCommand::UpdateConfiguration {
1318 storage_parameters: *params,
1319 })
1320 }
1321 }
1322 StorageCommand::RunIngestion(ingestion) => {
1323 let RunIngestionCommand { id, description } = *ingestion;
1324
1325 // Remember the ingestion description to facilitate possible
1326 // reconciliation later.
1327 self.ingestions.insert(id, description.clone());
1328
1329 // Initialize shared frontier reporting.
1330 for id in description.collection_ids() {
1331 self.reported_frontiers
1332 .entry(id)
1333 .or_insert_with(|| Antichain::from_elem(mz_repr::Timestamp::minimum()));
1334 }
1335
1336 // This needs to be done by one worker, which will broadcasts a
1337 // `CreateIngestionDataflow` command to all workers based on the response that
1338 // contains the resumption upper.
1339 //
1340 // Doing this separately on each worker could lead to differing resume_uppers
1341 // which might lead to all kinds of mayhem.
1342 //
1343 // n.b. the ingestion on each worker uses the description from worker 0––not the
1344 // ingestion in the local storage state. This is something we might have
1345 // interest in fixing in the future, e.g. materialize#19907
1346 if self.timely_worker_index == 0 {
1347 self.async_worker
1348 .update_ingestion_frontiers(id, description);
1349 }
1350 }
1351 StorageCommand::RunOneshotIngestion(oneshot) => {
1352 if self.timely_worker_index == 0 {
1353 self.internal_cmd_tx
1354 .send(InternalStorageCommand::RunOneshotIngestion {
1355 ingestion_id: oneshot.ingestion_id,
1356 collection_id: oneshot.collection_id,
1357 collection_meta: oneshot.collection_meta,
1358 request: oneshot.request,
1359 });
1360 }
1361 }
1362 StorageCommand::CancelOneshotIngestion(id) => {
1363 self.drop_oneshot_ingestion(id);
1364 }
1365 StorageCommand::RunSink(export) => {
1366 // Remember the sink description to facilitate possible
1367 // reconciliation later.
1368 let prev = self.exports.insert(export.id, export.description.clone());
1369
1370 // New sink, add state.
1371 if prev.is_none() {
1372 self.reported_frontiers.insert(
1373 export.id,
1374 Antichain::from_elem(mz_repr::Timestamp::minimum()),
1375 );
1376 }
1377
1378 // This needs to be broadcast by one worker and go through the internal command
1379 // fabric, to ensure consistent ordering of dataflow rendering across all
1380 // workers.
1381 if self.timely_worker_index == 0 {
1382 self.internal_cmd_tx
1383 .send(InternalStorageCommand::RunSinkDataflow(
1384 export.id,
1385 export.description,
1386 ));
1387 }
1388 }
1389 StorageCommand::AllowCompaction(id, frontier) => {
1390 soft_assert_or_log!(
1391 self.exports.contains_key(&id) || self.reported_frontiers.contains_key(&id),
1392 "AllowCompaction command for non-existent {id}"
1393 );
1394
1395 if frontier.is_empty() {
1396 // Indicates that we may drop `id`, as there are no more valid times to read.
1397 self.drop_collection(id);
1398 }
1399 }
1400 }
1401 }
1402
1403 /// Drop the identified storage collection from the storage state.
1404 fn drop_collection(&mut self, id: GlobalId) {
1405 fail_point!("crash_on_drop");
1406
1407 self.ingestions.remove(&id);
1408 self.exports.remove(&id);
1409
1410 let _ = self.latest_status_updates.remove(&id);
1411
1412 // This will stop reporting of frontiers.
1413 //
1414 // If this object still has its frontiers reported, we will notify the
1415 // client envd of the drop.
1416 if self.reported_frontiers.remove(&id).is_some() {
1417 // The only actions left are internal cleanup, so we can commit to
1418 // the client that these objects have been dropped.
1419 //
1420 // This must be done now rather than in response to `DropDataflow`,
1421 // otherwise we introduce the possibility of a timing issue where:
1422 // - We remove all tracking state from the storage state and send
1423 // `DropDataflow` (i.e. this block).
1424 // - While waiting to process that command, we reconcile with a new
1425 // envd. That envd has already committed to its catalog that this
1426 // object no longer exists.
1427 // - We process the `DropDataflow` command, and identify that this
1428 // object has been dropped.
1429 // - The next time `dropped_ids` is processed, we send a response
1430 // that this ID has been dropped, but the upstream state has no
1431 // record of that object having ever existed.
1432 self.dropped_ids.push(id);
1433 }
1434
1435 // Send through async worker for correct ordering with RunIngestion, and
1436 // dropping the dataflow is done on async worker response.
1437 if self.timely_worker_index == 0 {
1438 self.async_worker.drop_dataflow(id);
1439 }
1440 }
1441
1442 /// Drop the identified oneshot ingestion from the storage state.
1443 fn drop_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) {
1444 let prev = self.oneshot_ingestions.remove(&ingestion_id);
1445 info!(%ingestion_id, existed = %prev.is_some(), "dropping oneshot ingestion");
1446 }
1447}