mz_storage/storage_state.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for storage timely instances.
7//!
8//! One instance of a [`Worker`], along with its contained [`StorageState`], is
9//! part of an ensemble of storage workers that all run inside the same timely
10//! cluster. We call this worker a _storage worker_ to disambiguate it from
11//! other kinds of workers, potentially other components that might be sharing
12//! the same timely cluster.
13//!
14//! ## Controller and internal communication
15//!
16//! A worker receives _external_ [`StorageCommands`](StorageCommand) from the
17//! storage controller, via a channel. Storage workers also share an _internal_
18//! control/command fabric ([`internal_control`]). Internal commands go through
19//! a sequencer dataflow that ensures that all workers receive all commands in
20//! the same consistent order.
21//!
22//! We need to make sure that commands that cause dataflows to be rendered are
23//! processed in the same consistent order across all workers because timely
24//! requires this. To achieve this, we make sure that only internal commands can
25//! cause dataflows to be rendered. External commands (from the controller)
26//! cause internal commands to be broadcast (by only one worker), to get
27//! dataflows rendered.
28//!
29//! The internal command fabric is also used to broadcast messages from a local
30//! operator/worker to all workers. For example, when we need to tear down and
31//! restart a dataflow on all workers when an error is encountered.
32//!
33//! ## Async Storage Worker
34//!
35//! The storage worker has a companion [`AsyncStorageWorker`] that must be used
36//! when running code that requires `async`. This is needed because a timely
37//! main loop cannot run `async` code.
38//!
39//! ## Example flow of commands for `RunIngestion`
40//!
41//! With external commands, internal commands, and the async worker,
42//! understanding where and how commands from the controller are realized can
43//! get complicated. We will follow the complete flow for `RunIngestion`, as an
44//! example:
45//!
46//! 1. Worker receives a [`StorageCommand::RunIngestion`] command from the
47//! controller.
48//! 2. This command is processed in [`StorageState::handle_storage_command`].
49//! This step cannot render dataflows, because it does not have access to the
50//! timely worker. It will only set up state that stays over the whole
51//! lifetime of the source, such as the `reported_frontier`. Putting in place
52//! this reported frontier will enable frontier reporting for that source. We
53//! will not start reporting when we only see an internal command for
54//! rendering a dataflow, which can "overtake" the external `RunIngestion`
55//! command.
56//! 3. During processing of that command, we call
57//! [`AsyncStorageWorker::update_frontiers`], which causes a command to
58//! be sent to the async worker.
59//! 4. We eventually get a response from the async worker:
60//! [`AsyncStorageWorkerResponse::FrontiersUpdated`].
61//! 5. This response is handled in [`Worker::handle_async_worker_response`].
62//! 6. Handling that response causes a
63//! [`InternalStorageCommand::CreateIngestionDataflow`] to be broadcast to
64//! all workers via the internal command fabric.
65//! 7. This message will be processed (on each worker) in
66//! [`Worker::handle_internal_storage_command`]. This is what will cause the
67//! required dataflow to be rendered on all workers.
68//!
69//! The process described above assumes that the `RunIngestion` is _not_ an
70//! update, i.e. it is in response to a `CREATE SOURCE`-like statement.
71//!
72//! The primary distinction when handling a `RunIngestion` that represents an
73//! update, is that it might fill out new internal state in the mid-level
74//! clients on the way toward being run.
75
76use std::cell::RefCell;
77use std::collections::{BTreeMap, BTreeSet, VecDeque};
78use std::path::PathBuf;
79use std::rc::Rc;
80use std::sync::Arc;
81use std::thread;
82use std::time::Duration;
83
84use crossbeam_channel::{RecvError, TryRecvError};
85use fail::fail_point;
86use mz_ore::now::NowFn;
87use mz_ore::tracing::TracingHandle;
88use mz_ore::{soft_assert_or_log, soft_panic_or_log};
89use mz_persist_client::batch::ProtoBatch;
90use mz_persist_client::cache::PersistClientCache;
91use mz_persist_client::operators::shard_source::ErrorHandler;
92use mz_repr::{GlobalId, Timestamp};
93use mz_rocksdb::config::SharedWriteBufferManager;
94use mz_storage_client::client::{
95 RunIngestionCommand, StatusUpdate, StorageCommand, StorageResponse,
96};
97use mz_storage_types::AlterCompatible;
98use mz_storage_types::configuration::StorageConfiguration;
99use mz_storage_types::connections::ConnectionContext;
100use mz_storage_types::controller::CollectionMetadata;
101use mz_storage_types::dyncfgs::STORAGE_SERVER_MAINTENANCE_INTERVAL;
102use mz_storage_types::oneshot_sources::OneshotIngestionDescription;
103use mz_storage_types::sinks::StorageSinkDesc;
104use mz_storage_types::sources::IngestionDescription;
105use mz_timely_util::builder_async::PressOnDropButton;
106use mz_txn_wal::operator::TxnsContext;
107use timely::communication::Allocate;
108use timely::order::PartialOrder;
109use timely::progress::Timestamp as _;
110use timely::progress::frontier::Antichain;
111use timely::worker::Worker as TimelyWorker;
112use tokio::sync::{mpsc, watch};
113use tokio::time::Instant;
114use tracing::{info, warn};
115use uuid::Uuid;
116
117use crate::internal_control::{
118 self, DataflowParameters, InternalCommandReceiver, InternalCommandSender,
119 InternalStorageCommand,
120};
121use crate::metrics::StorageMetrics;
122use crate::statistics::{AggregatedStatistics, SinkStatistics, SourceStatistics};
123use crate::storage_state::async_storage_worker::{AsyncStorageWorker, AsyncStorageWorkerResponse};
124
125pub mod async_storage_worker;
126
127type CommandReceiver = crossbeam_channel::Receiver<StorageCommand>;
128type ResponseSender = mpsc::UnboundedSender<StorageResponse>;
129
130/// State maintained for each worker thread.
131///
132/// Much of this state can be viewed as local variables for the worker thread,
133/// holding state that persists across function calls.
134pub struct Worker<'w, A: Allocate> {
135 /// The underlying Timely worker.
136 ///
137 /// NOTE: This is `pub` for testing.
138 pub timely_worker: &'w mut TimelyWorker<A>,
139 /// The channel over which communication handles for newly connected clients
140 /// are delivered.
141 pub client_rx: crossbeam_channel::Receiver<(CommandReceiver, ResponseSender)>,
142 /// The state associated with collection ingress and egress.
143 pub storage_state: StorageState,
144}
145
146impl<'w, A: Allocate> Worker<'w, A> {
147 /// Creates new `Worker` state from the given components.
148 pub fn new(
149 timely_worker: &'w mut TimelyWorker<A>,
150 client_rx: crossbeam_channel::Receiver<(CommandReceiver, ResponseSender)>,
151 metrics: StorageMetrics,
152 now: NowFn,
153 connection_context: ConnectionContext,
154 instance_context: StorageInstanceContext,
155 persist_clients: Arc<PersistClientCache>,
156 txns_ctx: TxnsContext,
157 tracing_handle: Arc<TracingHandle>,
158 shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
159 ) -> Self {
160 // It is very important that we only create the internal control
161 // flow/command sequencer once because a) the worker state is re-used
162 // when a new client connects and b) dataflows that have already been
163 // rendered into the timely worker are reused as well.
164 //
165 // If we created a new sequencer every time we get a new client (likely
166 // because the controller re-started and re-connected), dataflows that
167 // were rendered before would still hold a handle to the old sequencer
168 // but we would not read their commands anymore.
169 let (internal_cmd_tx, internal_cmd_rx) =
170 internal_control::setup_command_sequencer(timely_worker);
171
172 let storage_configuration =
173 StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs());
174
175 // We always initialize as read_only=true. Only when we're explicitly
176 // allowed do we switch to doing writes.
177 let (read_only_tx, read_only_rx) = watch::channel(true);
178
179 // Similar to the internal command sequencer, it is very important that
180 // we only create the async worker once because a) the worker state is
181 // re-used when a new client connects and b) commands that have already
182 // been sent and might yield a response will be lost if a new iteration
183 // of `run_client` creates a new async worker.
184 //
185 // If we created a new async worker every time we get a new client
186 // (likely because the controller re-started and re-connected), we can
187 // get into an inconsistent state where we think that a dataflow has
188 // been rendered, for example because there is an entry in
189 // `StorageState::ingestions`, while there is not yet a dataflow. This
190 // happens because the dataflow only gets rendered once we get a
191 // response from the async worker and send off an internal command.
192 //
193 // The core idea is that both the sequencer and the async worker are
194 // part of the per-worker state, and must be treated as such, meaning
195 // they must survive between invocations of `run_client`.
196
197 // TODO(aljoscha): This thread unparking business seems brittle, but that's
198 // also how the command channel works currently. We can wrap it inside a
199 // struct that holds both a channel and a `Thread`, but I don't
200 // think that would help too much.
201 let async_worker = async_storage_worker::AsyncStorageWorker::new(
202 thread::current(),
203 Arc::clone(&persist_clients),
204 );
205 let cluster_memory_limit = instance_context.cluster_memory_limit;
206
207 let storage_state = StorageState {
208 source_uppers: BTreeMap::new(),
209 source_tokens: BTreeMap::new(),
210 metrics,
211 reported_frontiers: BTreeMap::new(),
212 ingestions: BTreeMap::new(),
213 exports: BTreeMap::new(),
214 oneshot_ingestions: BTreeMap::new(),
215 now,
216 timely_worker_index: timely_worker.index(),
217 timely_worker_peers: timely_worker.peers(),
218 instance_context,
219 persist_clients,
220 txns_ctx,
221 sink_tokens: BTreeMap::new(),
222 sink_write_frontiers: BTreeMap::new(),
223 dropped_ids: Vec::new(),
224 aggregated_statistics: AggregatedStatistics::new(
225 timely_worker.index(),
226 timely_worker.peers(),
227 ),
228 shared_status_updates: Default::default(),
229 latest_status_updates: Default::default(),
230 initial_status_reported: Default::default(),
231 internal_cmd_tx,
232 internal_cmd_rx,
233 read_only_tx,
234 read_only_rx,
235 async_worker,
236 storage_configuration,
237 dataflow_parameters: DataflowParameters::new(
238 shared_rocksdb_write_buffer_manager,
239 cluster_memory_limit,
240 ),
241 tracing_handle,
242 server_maintenance_interval: Duration::ZERO,
243 };
244
245 // TODO(aljoscha): We might want `async_worker` and `internal_cmd_tx` to
246 // be fields of `Worker` instead of `StorageState`, but at least for the
247 // command flow sources and sinks need access to that. We can refactor
248 // this once we have a clearer boundary between what sources/sinks need
249 // and the full "power" of the internal command flow, which should stay
250 // internal to the worker/not be exposed to source/sink implementations.
251 Self {
252 timely_worker,
253 client_rx,
254 storage_state,
255 }
256 }
257}
258
259/// Worker-local state related to the ingress or egress of collections of data.
260pub struct StorageState {
261 /// The highest observed upper frontier for collection.
262 ///
263 /// This is shared among all source instances, so that they can jointly advance the
264 /// frontier even as other instances are created and dropped. Ideally, the Storage
265 /// module would eventually provide one source of truth on this rather than multiple,
266 /// and we should aim for that but are not there yet.
267 pub source_uppers: BTreeMap<GlobalId, Rc<RefCell<Antichain<mz_repr::Timestamp>>>>,
268 /// Handles to created sources, keyed by ID
269 /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
270 /// to prevent usage of custom shutdown tokens that are tricky to get right.
271 pub source_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
272 /// Metrics for storage objects.
273 pub metrics: StorageMetrics,
274 /// Tracks the conditional write frontiers we have reported.
275 pub reported_frontiers: BTreeMap<GlobalId, Antichain<Timestamp>>,
276 /// Descriptions of each installed ingestion.
277 pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
278 /// Descriptions of each installed export.
279 pub exports: BTreeMap<GlobalId, StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>,
280 /// Descriptions of oneshot ingestions that are currently running.
281 pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
282 /// Undocumented
283 pub now: NowFn,
284 /// Index of the associated timely dataflow worker.
285 pub timely_worker_index: usize,
286 /// Peers in the associated timely dataflow worker.
287 pub timely_worker_peers: usize,
288 /// Other configuration for sources and sinks.
289 pub instance_context: StorageInstanceContext,
290 /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
291 /// This is intentionally shared between workers
292 pub persist_clients: Arc<PersistClientCache>,
293 /// Context necessary for rendering txn-wal operators.
294 pub txns_ctx: TxnsContext,
295 /// Tokens that should be dropped when a dataflow is dropped to clean up
296 /// associated state.
297 /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
298 /// to prevent usage of custom shutdown tokens that are tricky to get right.
299 pub sink_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
300 /// Frontier of sink writes (all subsequent writes will be at times at or
301 /// equal to this frontier)
302 pub sink_write_frontiers: BTreeMap<GlobalId, Rc<RefCell<Antichain<Timestamp>>>>,
303 /// Collection ids that have been dropped but not yet reported as dropped
304 pub dropped_ids: Vec<GlobalId>,
305
306 /// Statistics for sources and sinks.
307 pub aggregated_statistics: AggregatedStatistics,
308
309 /// A place shared with running dataflows, so that health operators, can
310 /// report status updates back to us.
311 ///
312 /// **NOTE**: Operators that append to this collection should take care to only add new
313 /// status updates if the status of the ingestion/export in question has _changed_.
314 pub shared_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,
315
316 /// The latest status update for each object.
317 pub latest_status_updates: BTreeMap<GlobalId, StatusUpdate>,
318
319 /// Whether we have reported the initial status after connecting to a new client.
320 /// This is reset to false when a new client connects.
321 pub initial_status_reported: bool,
322
323 /// Sender for cluster-internal storage commands. These can be sent from
324 /// within workers/operators and will be distributed to all workers. For
325 /// example, for shutting down an entire dataflow from within a
326 /// operator/worker.
327 pub internal_cmd_tx: InternalCommandSender,
328 /// Receiver for cluster-internal storage commands.
329 pub internal_cmd_rx: InternalCommandReceiver,
330
331 /// When this replica/cluster is in read-only mode it must not affect any
332 /// changes to external state. This flag can only be changed by a
333 /// [StorageCommand::AllowWrites].
334 ///
335 /// Everything running on this replica/cluster must obey this flag. At the
336 /// time of writing, nothing currently looks at this flag.
337 /// TODO(benesch): fix this.
338 ///
339 /// NOTE: In the future, we might want a more complicated flag, for example
340 /// something that tells us after which timestamp we are allowed to write.
341 /// In this first version we are keeping things as simple as possible!
342 pub read_only_rx: watch::Receiver<bool>,
343
344 /// Send-side for read-only state.
345 pub read_only_tx: watch::Sender<bool>,
346
347 /// Async worker companion, used for running code that requires async, which
348 /// the timely main loop cannot do.
349 pub async_worker: AsyncStorageWorker<mz_repr::Timestamp>,
350
351 /// Configuration for source and sink connections.
352 pub storage_configuration: StorageConfiguration,
353 /// Dynamically configurable parameters that control how dataflows are rendered.
354 /// NOTE(guswynn): we should consider moving these into `storage_configuration`.
355 pub dataflow_parameters: DataflowParameters,
356
357 /// A process-global handle to tracing configuration.
358 pub tracing_handle: Arc<TracingHandle>,
359
360 /// Interval at which to perform server maintenance tasks. Set to a zero interval to
361 /// perform maintenance with every `step_or_park` invocation.
362 pub server_maintenance_interval: Duration,
363}
364
365impl StorageState {
366 /// Return an error handler that triggers a suspend and restart of the corresponding storage
367 /// dataflow.
368 pub fn error_handler(&self, context: &'static str, id: GlobalId) -> ErrorHandler {
369 let tx = self.internal_cmd_tx.clone();
370 ErrorHandler::signal(move |e| {
371 tx.send(InternalStorageCommand::SuspendAndRestart {
372 id,
373 reason: format!("{context}: {e:#}"),
374 })
375 })
376 }
377}
378
379/// Extra context for a storage instance.
380/// This is extra information that is used when rendering source
381/// and sinks that is not tied to the source/connection configuration itself.
382#[derive(Clone)]
383pub struct StorageInstanceContext {
384 /// A directory that can be used for scratch work.
385 pub scratch_directory: Option<PathBuf>,
386 /// A global `rocksdb::Env`, shared across ALL instances of `RocksDB` (even
387 /// across sources!). This `Env` lets us control some resources (like background threads)
388 /// process-wide.
389 pub rocksdb_env: rocksdb::Env,
390 /// The memory limit of the materialize cluster replica. This will
391 /// be used to calculate and configure the maximum inflight bytes for backpressure
392 pub cluster_memory_limit: Option<usize>,
393}
394
395impl StorageInstanceContext {
396 /// Build a new `StorageInstanceContext`.
397 pub fn new(
398 scratch_directory: Option<PathBuf>,
399 cluster_memory_limit: Option<usize>,
400 ) -> Result<Self, anyhow::Error> {
401 Ok(Self {
402 scratch_directory,
403 rocksdb_env: rocksdb::Env::new()?,
404 cluster_memory_limit,
405 })
406 }
407
408 /// Constructs a new connection context for usage in tests.
409 pub fn for_tests(rocksdb_env: rocksdb::Env) -> Self {
410 Self {
411 scratch_directory: None,
412 rocksdb_env,
413 cluster_memory_limit: None,
414 }
415 }
416}
417
418impl<'w, A: Allocate> Worker<'w, A> {
419 /// Waits for client connections and runs them to completion.
420 pub fn run(&mut self) {
421 while let Ok((rx, tx)) = self.client_rx.recv() {
422 self.run_client(rx, tx);
423 }
424 }
425
426 /// Runs this (timely) storage worker until the given `command_rx` is
427 /// disconnected.
428 ///
429 /// See the [module documentation](crate::storage_state) for this
430 /// workers responsibilities, how it communicates with the other workers and
431 /// how commands flow from the controller and through the workers.
432 fn run_client(&mut self, command_rx: CommandReceiver, response_tx: ResponseSender) {
433 // At this point, all workers are still reading from the command flow.
434 if self.reconcile(&command_rx).is_err() {
435 return;
436 }
437
438 // The last time we reported statistics.
439 let mut last_stats_time = Instant::now();
440
441 // The last time we did periodic maintenance.
442 let mut last_maintenance = std::time::Instant::now();
443
444 let mut disconnected = false;
445 while !disconnected {
446 let config = &self.storage_state.storage_configuration;
447 let stats_interval = config.parameters.statistics_collection_interval;
448
449 let maintenance_interval = self.storage_state.server_maintenance_interval;
450
451 let now = std::time::Instant::now();
452 // Determine if we need to perform maintenance, which is true if `maintenance_interval`
453 // time has passed since the last maintenance.
454 let sleep_duration;
455 if now >= last_maintenance + maintenance_interval {
456 last_maintenance = now;
457 sleep_duration = None;
458
459 self.report_frontier_progress(&response_tx);
460 } else {
461 // We didn't perform maintenance, sleep until the next maintenance interval.
462 let next_maintenance = last_maintenance + maintenance_interval;
463 sleep_duration = Some(next_maintenance.saturating_duration_since(now))
464 }
465
466 // Ask Timely to execute a unit of work.
467 //
468 // If there are no pending commands or responses from the async
469 // worker, we ask Timely to park the thread if there's nothing to
470 // do. We rely on another thread unparking us when there's new work
471 // to be done, e.g., when sending a command or when new Kafka
472 // messages have arrived.
473 //
474 // It is critical that we allow Timely to park iff there are no
475 // pending commands or responses. The command may have already been
476 // consumed by the call to `client_rx.recv`. See:
477 // https://github.com/MaterializeInc/materialize/pull/13973#issuecomment-1200312212
478 if command_rx.is_empty() && self.storage_state.async_worker.is_empty() {
479 // Make sure we wake up again to report any pending statistics updates.
480 let mut park_duration = stats_interval.saturating_sub(last_stats_time.elapsed());
481 if let Some(sleep_duration) = sleep_duration {
482 park_duration = std::cmp::min(sleep_duration, park_duration);
483 }
484 self.timely_worker.step_or_park(Some(park_duration));
485 } else {
486 self.timely_worker.step();
487 }
488
489 // Rerport any dropped ids
490 for id in std::mem::take(&mut self.storage_state.dropped_ids) {
491 self.send_storage_response(&response_tx, StorageResponse::DroppedId(id));
492 }
493
494 self.process_oneshot_ingestions(&response_tx);
495
496 self.report_status_updates(&response_tx);
497
498 if last_stats_time.elapsed() >= stats_interval {
499 self.report_storage_statistics(&response_tx);
500 last_stats_time = Instant::now();
501 }
502
503 // Handle any received commands.
504 loop {
505 match command_rx.try_recv() {
506 Ok(cmd) => self.storage_state.handle_storage_command(cmd),
507 Err(TryRecvError::Empty) => break,
508 Err(TryRecvError::Disconnected) => {
509 disconnected = true;
510 break;
511 }
512 }
513 }
514
515 // Handle responses from the async worker.
516 while let Ok(response) = self.storage_state.async_worker.try_recv() {
517 self.handle_async_worker_response(response);
518 }
519
520 // Handle any received commands.
521 while let Some(command) = self.storage_state.internal_cmd_rx.try_recv() {
522 self.handle_internal_storage_command(command);
523 }
524 }
525 }
526
527 /// Entry point for applying a response from the async storage worker.
528 pub fn handle_async_worker_response(
529 &self,
530 async_response: AsyncStorageWorkerResponse<mz_repr::Timestamp>,
531 ) {
532 // NOTE: If we want to share the load of async processing we
533 // have to change `handle_storage_command` and change this
534 // assert.
535 assert_eq!(
536 self.timely_worker.index(),
537 0,
538 "only worker #0 is doing async processing"
539 );
540 match async_response {
541 AsyncStorageWorkerResponse::FrontiersUpdated {
542 id,
543 ingestion_description,
544 as_of,
545 resume_uppers,
546 source_resume_uppers,
547 } => {
548 self.storage_state.internal_cmd_tx.send(
549 InternalStorageCommand::CreateIngestionDataflow {
550 id,
551 ingestion_description,
552 as_of,
553 resume_uppers,
554 source_resume_uppers,
555 },
556 );
557 }
558 AsyncStorageWorkerResponse::DropDataflow(id) => {
559 self.storage_state
560 .internal_cmd_tx
561 .send(InternalStorageCommand::DropDataflow(vec![id]));
562 }
563 }
564 }
565
566 /// Entry point for applying an internal storage command.
567 pub fn handle_internal_storage_command(&mut self, internal_cmd: InternalStorageCommand) {
568 match internal_cmd {
569 InternalStorageCommand::SuspendAndRestart { id, reason } => {
570 info!(
571 "worker {}/{} initiating suspend-and-restart for {id} because of: {reason}",
572 self.timely_worker.index(),
573 self.timely_worker.peers(),
574 );
575
576 let maybe_ingestion = self.storage_state.ingestions.get(&id).cloned();
577 if let Some(ingestion_description) = maybe_ingestion {
578 // Yank the token of the previously existing source dataflow.Note that this
579 // token also includes any source exports/subsources.
580 let maybe_token = self.storage_state.source_tokens.remove(&id);
581 if maybe_token.is_none() {
582 // Something has dropped the source. Make sure we don't
583 // accidentally re-create it.
584 return;
585 }
586
587 // This needs to be done by one worker, which will
588 // broadcasts a `CreateIngestionDataflow` command to all
589 // workers based on the response that contains the
590 // resumption upper.
591 //
592 // Doing this separately on each worker could lead to
593 // differing resume_uppers which might lead to all kinds of
594 // mayhem.
595 //
596 // TODO(aljoscha): If we ever become worried that this is
597 // putting undue pressure on worker 0 we can pick the
598 // designated worker for a source/sink based on `id.hash()`.
599 if self.timely_worker.index() == 0 {
600 for (id, _) in ingestion_description.source_exports.iter() {
601 self.storage_state
602 .aggregated_statistics
603 .advance_global_epoch(*id);
604 }
605 self.storage_state
606 .async_worker
607 .update_frontiers(id, ingestion_description);
608 }
609
610 // Continue with other commands.
611 return;
612 }
613
614 let maybe_sink = self.storage_state.exports.get(&id).cloned();
615 if let Some(sink_description) = maybe_sink {
616 // Yank the token of the previously existing sink
617 // dataflow.
618 let maybe_token = self.storage_state.sink_tokens.remove(&id);
619
620 if maybe_token.is_none() {
621 // Something has dropped the sink. Make sure we don't
622 // accidentally re-create it.
623 return;
624 }
625
626 // This needs to be broadcast by one worker and go through
627 // the internal command fabric, to ensure consistent
628 // ordering of dataflow rendering across all workers.
629 if self.timely_worker.index() == 0 {
630 self.storage_state
631 .aggregated_statistics
632 .advance_global_epoch(id);
633 self.storage_state.internal_cmd_tx.send(
634 InternalStorageCommand::RunSinkDataflow(id, sink_description),
635 );
636 }
637
638 // Continue with other commands.
639 return;
640 }
641
642 if !self
643 .storage_state
644 .ingestions
645 .values()
646 .any(|v| v.source_exports.contains_key(&id))
647 {
648 // Our current approach to dropping a source results in a race between shard
649 // finalization (which happens in the controller) and dataflow shutdown (which
650 // happens in clusterd). If a source is created and dropped fast enough -or the
651 // two commands get sufficiently delayed- then it's possible to receive a
652 // SuspendAndRestart command for an unknown source. We cannot assert that this
653 // never happens but we log an error here to track how often this happens.
654 warn!(
655 "got InternalStorageCommand::SuspendAndRestart for something that is not a source or sink: {id}"
656 );
657 }
658 }
659 InternalStorageCommand::CreateIngestionDataflow {
660 id: ingestion_id,
661 mut ingestion_description,
662 as_of,
663 mut resume_uppers,
664 mut source_resume_uppers,
665 } => {
666 info!(
667 ?as_of,
668 ?resume_uppers,
669 "worker {}/{} trying to (re-)start ingestion {ingestion_id}",
670 self.timely_worker.index(),
671 self.timely_worker.peers(),
672 );
673
674 // We initialize statistics before we prune finished exports. We
675 // still want to export statistics for these, plus the rendering
676 // machinery will get confused if there are not at least
677 // statistics for the "main" source.
678 for (export_id, export) in ingestion_description.source_exports.iter() {
679 let resume_upper = resume_uppers[export_id].clone();
680 self.storage_state.aggregated_statistics.initialize_source(
681 *export_id,
682 resume_upper.clone(),
683 || {
684 SourceStatistics::new(
685 *export_id,
686 self.storage_state.timely_worker_index,
687 &self.storage_state.metrics.source_statistics,
688 ingestion_id,
689 &export.storage_metadata.data_shard,
690 export.data_config.envelope.clone(),
691 resume_upper,
692 )
693 },
694 );
695 }
696
697 let finished_exports: BTreeSet<GlobalId> = resume_uppers
698 .iter()
699 .filter(|(_, frontier)| frontier.is_empty())
700 .map(|(id, _)| *id)
701 .collect();
702
703 resume_uppers.retain(|id, _| !finished_exports.contains(id));
704 source_resume_uppers.retain(|id, _| !finished_exports.contains(id));
705 ingestion_description
706 .source_exports
707 .retain(|id, _| !finished_exports.contains(id));
708
709 for id in ingestion_description.collection_ids() {
710 // If there is already a shared upper, we re-use it, to make
711 // sure that parties that are already using the shared upper
712 // can continue doing so.
713 let source_upper = self
714 .storage_state
715 .source_uppers
716 .entry(id.clone())
717 .or_insert_with(|| {
718 Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())))
719 });
720
721 let mut source_upper = source_upper.borrow_mut();
722 if !source_upper.is_empty() {
723 source_upper.clear();
724 source_upper.insert(mz_repr::Timestamp::minimum());
725 }
726 }
727
728 // If all subsources of the source are finished, we can skip rendering entirely.
729 // Also, if `as_of` is empty, the dataflow has been finalized, so we can skip it as
730 // well.
731 //
732 // TODO(guswynn|petrosagg): this is a bit hacky, and is a consequence of storage state
733 // management being a bit of a mess. we should clean this up and remove weird if
734 // statements like this.
735 if resume_uppers.values().all(|frontier| frontier.is_empty()) || as_of.is_empty() {
736 tracing::info!(
737 ?resume_uppers,
738 ?as_of,
739 "worker {}/{} skipping building ingestion dataflow \
740 for {ingestion_id} because the ingestion is finished",
741 self.timely_worker.index(),
742 self.timely_worker.peers(),
743 );
744 return;
745 }
746
747 crate::render::build_ingestion_dataflow(
748 self.timely_worker,
749 &mut self.storage_state,
750 ingestion_id,
751 ingestion_description,
752 as_of,
753 resume_uppers,
754 source_resume_uppers,
755 );
756 }
757 InternalStorageCommand::RunOneshotIngestion {
758 ingestion_id,
759 collection_id,
760 collection_meta,
761 request,
762 } => {
763 crate::render::build_oneshot_ingestion_dataflow(
764 self.timely_worker,
765 &mut self.storage_state,
766 ingestion_id,
767 collection_id,
768 collection_meta,
769 request,
770 );
771 }
772 InternalStorageCommand::RunSinkDataflow(sink_id, sink_description) => {
773 info!(
774 "worker {}/{} trying to (re-)start sink {sink_id}",
775 self.timely_worker.index(),
776 self.timely_worker.peers(),
777 );
778
779 {
780 // If there is already a shared write frontier, we re-use it, to
781 // make sure that parties that are already using the shared
782 // frontier can continue doing so.
783 let sink_write_frontier = self
784 .storage_state
785 .sink_write_frontiers
786 .entry(sink_id.clone())
787 .or_insert_with(|| Rc::new(RefCell::new(Antichain::new())));
788
789 let mut sink_write_frontier = sink_write_frontier.borrow_mut();
790 sink_write_frontier.clear();
791 sink_write_frontier.insert(mz_repr::Timestamp::minimum());
792 }
793 self.storage_state
794 .aggregated_statistics
795 .initialize_sink(sink_id, || {
796 SinkStatistics::new(
797 sink_id,
798 self.storage_state.timely_worker_index,
799 &self.storage_state.metrics.sink_statistics,
800 )
801 });
802
803 crate::render::build_export_dataflow(
804 self.timely_worker,
805 &mut self.storage_state,
806 sink_id,
807 sink_description,
808 );
809 }
810 InternalStorageCommand::DropDataflow(ids) => {
811 for id in &ids {
812 // Clean up per-source / per-sink state.
813 self.storage_state.source_uppers.remove(id);
814 self.storage_state.source_tokens.remove(id);
815
816 self.storage_state.sink_tokens.remove(id);
817
818 self.storage_state.aggregated_statistics.deinitialize(*id);
819 }
820 }
821 InternalStorageCommand::UpdateConfiguration { storage_parameters } => {
822 self.storage_state
823 .dataflow_parameters
824 .update(storage_parameters.clone());
825 self.storage_state
826 .storage_configuration
827 .update(storage_parameters);
828
829 // Clear out the updates as we no longer forward them to anyone else to process.
830 // We clone `StorageState::storage_configuration` many times during rendering
831 // and want to avoid cloning these unused updates.
832 self.storage_state
833 .storage_configuration
834 .parameters
835 .dyncfg_updates = Default::default();
836
837 // Remember the maintenance interval locally to avoid reading it from the config set on
838 // every server iteration.
839 self.storage_state.server_maintenance_interval =
840 STORAGE_SERVER_MAINTENANCE_INTERVAL
841 .get(self.storage_state.storage_configuration.config_set());
842 }
843 InternalStorageCommand::StatisticsUpdate { sources, sinks } => self
844 .storage_state
845 .aggregated_statistics
846 .ingest(sources, sinks),
847 }
848 }
849
850 /// Emit information about write frontier progress, along with information that should
851 /// be made durable for this to be the case.
852 ///
853 /// The write frontier progress is "conditional" in that it is not until the information is made
854 /// durable that the data are emitted to downstream workers, and indeed they should not rely on
855 /// the completeness of what they hear until the information is made durable.
856 ///
857 /// Specifically, this sends information about new timestamp bindings created by dataflow workers,
858 /// with the understanding if that if made durable (and ack'd back to the workers) the source will
859 /// in fact progress with this write frontier.
860 pub fn report_frontier_progress(&mut self, response_tx: &ResponseSender) {
861 let mut new_uppers = Vec::new();
862
863 // Check if any observed frontier should advance the reported frontiers.
864 for (id, frontier) in self
865 .storage_state
866 .source_uppers
867 .iter()
868 .chain(self.storage_state.sink_write_frontiers.iter())
869 {
870 let Some(reported_frontier) = self.storage_state.reported_frontiers.get_mut(id) else {
871 // Frontier reporting has not yet been started for this object.
872 // Potentially because this timely worker has not yet seen the
873 // `CreateSources` command.
874 continue;
875 };
876
877 let observed_frontier = frontier.borrow();
878
879 // Only do a thing if it *advances* the frontier, not just *changes* the frontier.
880 // This is protection against `frontier` lagging behind what we have conditionally reported.
881 if PartialOrder::less_than(reported_frontier, &observed_frontier) {
882 new_uppers.push((*id, observed_frontier.clone()));
883 reported_frontier.clone_from(&observed_frontier);
884 }
885 }
886
887 for (id, upper) in new_uppers {
888 self.send_storage_response(response_tx, StorageResponse::FrontierUpper(id, upper));
889 }
890 }
891
892 /// Pumps latest status updates from the buffer shared with operators and
893 /// reports any updates that need reporting.
894 pub fn report_status_updates(&mut self, response_tx: &ResponseSender) {
895 // If we haven't done the initial status report, report all current statuses
896 if !self.storage_state.initial_status_reported {
897 // We pull initially reported status updates to "now", so that they
898 // sort as the latest update in internal status collections. This
899 // makes it so that a newly bootstrapped envd can append status
900 // updates to internal status collections that report an accurate
901 // view as of the time when they came up.
902 let now_ts = mz_ore::now::to_datetime((self.storage_state.now)());
903 let status_updates = self
904 .storage_state
905 .latest_status_updates
906 .values()
907 .cloned()
908 .map(|mut update| {
909 update.timestamp = now_ts.clone();
910 update
911 });
912 for update in status_updates {
913 self.send_storage_response(response_tx, StorageResponse::StatusUpdate(update));
914 }
915 self.storage_state.initial_status_reported = true;
916 }
917
918 // Pump updates into our state and stage them for reporting.
919 for shared_update in self.storage_state.shared_status_updates.take() {
920 self.send_storage_response(
921 response_tx,
922 StorageResponse::StatusUpdate(shared_update.clone()),
923 );
924
925 self.storage_state
926 .latest_status_updates
927 .insert(shared_update.id, shared_update);
928 }
929 }
930
931 /// Report source statistics back to the controller.
932 pub fn report_storage_statistics(&mut self, response_tx: &ResponseSender) {
933 let (sources, sinks) = self.storage_state.aggregated_statistics.emit_local();
934 if !sources.is_empty() || !sinks.is_empty() {
935 self.storage_state
936 .internal_cmd_tx
937 .send(InternalStorageCommand::StatisticsUpdate { sources, sinks })
938 }
939
940 let (sources, sinks) = self.storage_state.aggregated_statistics.snapshot();
941 if !sources.is_empty() || !sinks.is_empty() {
942 self.send_storage_response(
943 response_tx,
944 StorageResponse::StatisticsUpdates(sources, sinks),
945 );
946 }
947 }
948
949 /// Send a response to the coordinator.
950 fn send_storage_response(&self, response_tx: &ResponseSender, response: StorageResponse) {
951 // Ignore send errors because the coordinator is free to ignore our
952 // responses. This happens during shutdown.
953 let _ = response_tx.send(response);
954 }
955
956 fn process_oneshot_ingestions(&mut self, response_tx: &ResponseSender) {
957 use tokio::sync::mpsc::error::TryRecvError;
958
959 let mut to_remove = vec![];
960
961 for (ingestion_id, ingestion_state) in &mut self.storage_state.oneshot_ingestions {
962 loop {
963 match ingestion_state.results.try_recv() {
964 Ok(result) => {
965 let response = match result {
966 Ok(maybe_batch) => maybe_batch.into_iter().map(Result::Ok).collect(),
967 Err(err) => vec![Err(err)],
968 };
969 let staged_batches = BTreeMap::from([(*ingestion_id, response)]);
970 let _ = response_tx.send(StorageResponse::StagedBatches(staged_batches));
971 }
972 Err(TryRecvError::Empty) => {
973 break;
974 }
975 Err(TryRecvError::Disconnected) => {
976 to_remove.push(*ingestion_id);
977 break;
978 }
979 }
980 }
981 }
982
983 for ingestion_id in to_remove {
984 tracing::info!(?ingestion_id, "removing oneshot ingestion");
985 self.storage_state.oneshot_ingestions.remove(&ingestion_id);
986 }
987 }
988
989 /// Extract commands until `InitializationComplete`, and make the worker
990 /// reflect those commands. If the worker can not be made to reflect the
991 /// commands, return an error.
992 fn reconcile(&mut self, command_rx: &CommandReceiver) -> Result<(), RecvError> {
993 let worker_id = self.timely_worker.index();
994
995 // To initialize the connection, we want to drain all commands until we
996 // receive a `StorageCommand::InitializationComplete` command to form a
997 // target command state.
998 let mut commands = vec![];
999 loop {
1000 match command_rx.recv()? {
1001 StorageCommand::InitializationComplete => break,
1002 command => commands.push(command),
1003 }
1004 }
1005
1006 // Track which frontiers this envd expects; we will also set their
1007 // initial timestamp to the minimum timestamp to reset them as we don't
1008 // know what frontiers the new envd expects.
1009 let mut expected_objects = BTreeSet::new();
1010
1011 let mut drop_commands = BTreeSet::new();
1012 let mut running_ingestion_descriptions = self.storage_state.ingestions.clone();
1013 let mut running_exports_descriptions = self.storage_state.exports.clone();
1014
1015 let mut create_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1016 let mut cancel_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1017
1018 for command in &mut commands {
1019 match command {
1020 StorageCommand::CreateTimely { .. } => {
1021 panic!("CreateTimely must be captured before")
1022 }
1023 StorageCommand::AllowCompaction(id, since) => {
1024 info!(%worker_id, ?id, ?since, "reconcile: received AllowCompaction command");
1025
1026 // collect all "drop commands". These are `AllowCompaction`
1027 // commands that compact to the empty since. Then, later, we make sure
1028 // we retain only those `Create*` commands that are not dropped. We
1029 // assume that the `AllowCompaction` command is ordered after the
1030 // `Create*` commands but don't assert that.
1031 // WIP: Should we assert?
1032 if since.is_empty() {
1033 drop_commands.insert(*id);
1034 }
1035 }
1036 StorageCommand::RunIngestion(ingestion) => {
1037 info!(%worker_id, ?ingestion, "reconcile: received RunIngestion command");
1038
1039 // Ensure that ingestions are forward-rolling alter compatible.
1040 let prev = running_ingestion_descriptions
1041 .insert(ingestion.id, ingestion.description.clone());
1042
1043 if let Some(prev_ingest) = prev {
1044 // If the new ingestion is not exactly equal to the currently running
1045 // ingestion, we must either track that we need to synthesize an update
1046 // command to change the ingestion, or panic.
1047 prev_ingest
1048 .alter_compatible(ingestion.id, &ingestion.description)
1049 .expect("only alter compatible ingestions permitted");
1050 }
1051 }
1052 StorageCommand::RunSink(export) => {
1053 info!(%worker_id, ?export, "reconcile: received RunSink command");
1054
1055 // Ensure that exports are forward-rolling alter compatible.
1056 let prev =
1057 running_exports_descriptions.insert(export.id, export.description.clone());
1058
1059 if let Some(prev_export) = prev {
1060 prev_export
1061 .alter_compatible(export.id, &export.description)
1062 .expect("only alter compatible exports permitted");
1063 }
1064 }
1065 StorageCommand::RunOneshotIngestion(ingestion) => {
1066 info!(%worker_id, ?ingestion, "reconcile: received RunOneshotIngestion command");
1067 create_oneshot_ingestions.insert(ingestion.ingestion_id);
1068 }
1069 StorageCommand::CancelOneshotIngestion(uuid) => {
1070 info!(%worker_id, %uuid, "reconcile: received CancelOneshotIngestion command");
1071 cancel_oneshot_ingestions.insert(*uuid);
1072 }
1073 StorageCommand::InitializationComplete
1074 | StorageCommand::AllowWrites
1075 | StorageCommand::UpdateConfiguration(_) => (),
1076 }
1077 }
1078
1079 let mut seen_most_recent_definition = BTreeSet::new();
1080
1081 // We iterate over this backward to ensure that we keep only the most recent ingestion
1082 // description.
1083 let mut filtered_commands = VecDeque::new();
1084 for mut command in commands.into_iter().rev() {
1085 let mut should_keep = true;
1086 match &mut command {
1087 StorageCommand::CreateTimely { .. } => {
1088 panic!("CreateTimely must be captured before")
1089 }
1090 StorageCommand::RunIngestion(ingestion) => {
1091 // Subsources can be dropped independently of their
1092 // primary source, so we evaluate them in a separate
1093 // loop.
1094 for export_id in ingestion
1095 .description
1096 .source_exports
1097 .keys()
1098 .filter(|export_id| **export_id != ingestion.id)
1099 {
1100 if drop_commands.remove(export_id) {
1101 info!(%worker_id, %export_id, "reconcile: dropping subsource");
1102 self.storage_state.dropped_ids.push(*export_id);
1103 }
1104 }
1105
1106 if drop_commands.remove(&ingestion.id)
1107 || self.storage_state.dropped_ids.contains(&ingestion.id)
1108 {
1109 info!(%worker_id, %ingestion.id, "reconcile: dropping ingestion");
1110
1111 // If an ingestion is dropped, so too must all of
1112 // its subsources (i.e. ingestion exports, as well
1113 // as its progress subsource).
1114 for id in ingestion.description.collection_ids() {
1115 drop_commands.remove(&id);
1116 self.storage_state.dropped_ids.push(id);
1117 }
1118 should_keep = false;
1119 } else {
1120 let most_recent_defintion =
1121 seen_most_recent_definition.insert(ingestion.id);
1122
1123 if most_recent_defintion {
1124 // If this is the most recent definition, this
1125 // is what we will be running when
1126 // reconciliation completes. This definition
1127 // must not include any dropped subsources.
1128 ingestion.description.source_exports.retain(|export_id, _| {
1129 !self.storage_state.dropped_ids.contains(export_id)
1130 });
1131
1132 // After clearing any dropped subsources, we can
1133 // state that we expect all of these to exist.
1134 expected_objects.extend(ingestion.description.collection_ids());
1135 }
1136
1137 let running_ingestion = self.storage_state.ingestions.get(&ingestion.id);
1138
1139 // We keep only:
1140 // - The most recent version of the ingestion, which
1141 // is why these commands are run in reverse.
1142 // - Ingestions whose descriptions are not exactly
1143 // those that are currently running.
1144 should_keep = most_recent_defintion
1145 && running_ingestion != Some(&ingestion.description)
1146 }
1147 }
1148 StorageCommand::RunSink(export) => {
1149 if drop_commands.remove(&export.id)
1150 // If there were multiple `RunSink` in the command
1151 // stream, we want to ensure none of them are
1152 // retained.
1153 || self.storage_state.dropped_ids.contains(&export.id)
1154 {
1155 info!(%worker_id, %export.id, "reconcile: dropping sink");
1156
1157 // Make sure that we report back that the ID was
1158 // dropped.
1159 self.storage_state.dropped_ids.push(export.id);
1160
1161 should_keep = false
1162 } else {
1163 expected_objects.insert(export.id);
1164
1165 let running_sink = self.storage_state.exports.get(&export.id);
1166
1167 // We keep only:
1168 // - The most recent version of the sink, which
1169 // is why these commands are run in reverse.
1170 // - Sinks whose descriptions are not exactly
1171 // those that are currently running.
1172 should_keep = seen_most_recent_definition.insert(export.id)
1173 && running_sink != Some(&export.description);
1174 }
1175 }
1176 StorageCommand::RunOneshotIngestion(ingestion) => {
1177 let already_running = self
1178 .storage_state
1179 .oneshot_ingestions
1180 .contains_key(&ingestion.ingestion_id);
1181 let was_canceled = cancel_oneshot_ingestions.contains(&ingestion.ingestion_id);
1182
1183 should_keep = !already_running && !was_canceled;
1184 }
1185 StorageCommand::CancelOneshotIngestion(ingestion_id) => {
1186 let already_running = self
1187 .storage_state
1188 .oneshot_ingestions
1189 .contains_key(ingestion_id);
1190 should_keep = already_running;
1191 }
1192 StorageCommand::InitializationComplete
1193 | StorageCommand::AllowWrites
1194 | StorageCommand::UpdateConfiguration(_)
1195 | StorageCommand::AllowCompaction(_, _) => (),
1196 }
1197 if should_keep {
1198 filtered_commands.push_front(command);
1199 }
1200 }
1201 let commands = filtered_commands;
1202
1203 // Make sure all the "drop commands" matched up with a source or sink.
1204 // This is also what the regular handler logic for `AllowCompaction`
1205 // would do.
1206 soft_assert_or_log!(
1207 drop_commands.is_empty(),
1208 "AllowCompaction commands for non-existent IDs {:?}",
1209 drop_commands
1210 );
1211
1212 // Determine the ID of all objects we did _not_ see; these are
1213 // considered stale.
1214 let stale_objects = self
1215 .storage_state
1216 .ingestions
1217 .values()
1218 .map(|i| i.collection_ids())
1219 .flatten()
1220 .chain(self.storage_state.exports.keys().copied())
1221 // Objects are considered stale if we did not see them re-created.
1222 .filter(|id| !expected_objects.contains(id))
1223 .collect::<Vec<_>>();
1224 let stale_oneshot_ingestions = self
1225 .storage_state
1226 .oneshot_ingestions
1227 .keys()
1228 .filter(|ingestion_id| {
1229 let created = create_oneshot_ingestions.contains(ingestion_id);
1230 let dropped = cancel_oneshot_ingestions.contains(ingestion_id);
1231 mz_ore::soft_assert_or_log!(
1232 !created && dropped,
1233 "dropped non-existent oneshot source"
1234 );
1235 !created && !dropped
1236 })
1237 .copied()
1238 .collect::<Vec<_>>();
1239
1240 info!(
1241 %worker_id, ?expected_objects, ?stale_objects, ?stale_oneshot_ingestions,
1242 "reconcile: modifing storage state to match expected objects",
1243 );
1244
1245 for id in stale_objects {
1246 self.storage_state.drop_collection(id);
1247 }
1248 for id in stale_oneshot_ingestions {
1249 self.storage_state.drop_oneshot_ingestion(id);
1250 }
1251
1252 // Do not report dropping any objects that do not belong to expected
1253 // objects.
1254 self.storage_state
1255 .dropped_ids
1256 .retain(|id| expected_objects.contains(id));
1257
1258 // Do not report any frontiers that do not belong to expected objects.
1259 // Note that this set of objects can differ from the set of sources and
1260 // sinks.
1261 self.storage_state
1262 .reported_frontiers
1263 .retain(|id, _| expected_objects.contains(id));
1264
1265 // Reset the reported frontiers for the remaining objects.
1266 for (_, frontier) in &mut self.storage_state.reported_frontiers {
1267 *frontier = Antichain::from_elem(<_>::minimum());
1268 }
1269
1270 // Reset the initial status reported flag when a new client connects
1271 self.storage_state.initial_status_reported = false;
1272
1273 // Execute the modified commands.
1274 for command in commands {
1275 self.storage_state.handle_storage_command(command);
1276 }
1277
1278 Ok(())
1279 }
1280}
1281
1282impl StorageState {
1283 /// Entry point for applying a storage command.
1284 ///
1285 /// NOTE: This does not have access to the timely worker and therefore
1286 /// cannot render dataflows. For dataflow rendering, this needs to either
1287 /// send asynchronous command to the `async_worker` or internal
1288 /// commands to the `internal_cmd_tx`.
1289 pub fn handle_storage_command(&mut self, cmd: StorageCommand) {
1290 match cmd {
1291 StorageCommand::CreateTimely { .. } => panic!("CreateTimely must be captured before"),
1292 StorageCommand::InitializationComplete => (),
1293 StorageCommand::AllowWrites => {
1294 self.read_only_tx
1295 .send(false)
1296 .expect("we're holding one other end");
1297 self.persist_clients.cfg().enable_compaction();
1298 }
1299 StorageCommand::UpdateConfiguration(params) => {
1300 // These can be done from all workers safely.
1301 tracing::info!("Applying configuration update: {params:?}");
1302
1303 // We serialize the dyncfg updates in StorageParameters, but configure
1304 // persist separately.
1305 self.persist_clients
1306 .cfg()
1307 .apply_from(¶ms.dyncfg_updates);
1308
1309 params.tracing.apply(self.tracing_handle.as_ref());
1310
1311 if let Some(log_filter) = ¶ms.tracing.log_filter {
1312 self.storage_configuration
1313 .connection_context
1314 .librdkafka_log_level =
1315 mz_ore::tracing::crate_level(&log_filter.clone().into(), "librdkafka");
1316 }
1317
1318 // This needs to be broadcast by one worker and go through
1319 // the internal command fabric, to ensure consistent
1320 // ordering of dataflow rendering across all workers.
1321 if self.timely_worker_index == 0 {
1322 self.internal_cmd_tx
1323 .send(InternalStorageCommand::UpdateConfiguration {
1324 storage_parameters: *params,
1325 })
1326 }
1327 }
1328 StorageCommand::RunIngestion(ingestion) => {
1329 let RunIngestionCommand { id, description } = *ingestion;
1330
1331 // Remember the ingestion description to facilitate possible
1332 // reconciliation later.
1333 self.ingestions.insert(id, description.clone());
1334
1335 // Initialize shared frontier reporting.
1336 for id in description.collection_ids() {
1337 self.reported_frontiers
1338 .entry(id)
1339 .or_insert_with(|| Antichain::from_elem(mz_repr::Timestamp::minimum()));
1340 }
1341
1342 // This needs to be done by one worker, which will broadcasts a
1343 // `CreateIngestionDataflow` command to all workers based on the response that
1344 // contains the resumption upper.
1345 //
1346 // Doing this separately on each worker could lead to differing resume_uppers
1347 // which might lead to all kinds of mayhem.
1348 //
1349 // n.b. the ingestion on each worker uses the description from worker 0––not the
1350 // ingestion in the local storage state. This is something we might have
1351 // interest in fixing in the future, e.g. materialize#19907
1352 if self.timely_worker_index == 0 {
1353 self.async_worker.update_frontiers(id, description);
1354 }
1355 }
1356 StorageCommand::RunOneshotIngestion(oneshot) => {
1357 if self.timely_worker_index == 0 {
1358 self.internal_cmd_tx
1359 .send(InternalStorageCommand::RunOneshotIngestion {
1360 ingestion_id: oneshot.ingestion_id,
1361 collection_id: oneshot.collection_id,
1362 collection_meta: oneshot.collection_meta,
1363 request: oneshot.request,
1364 });
1365 }
1366 }
1367 StorageCommand::CancelOneshotIngestion(id) => {
1368 self.drop_oneshot_ingestion(id);
1369 }
1370 StorageCommand::RunSink(export) => {
1371 // Remember the sink description to facilitate possible
1372 // reconciliation later.
1373 let prev = self.exports.insert(export.id, export.description.clone());
1374
1375 // New sink, add state.
1376 if prev.is_none() {
1377 self.reported_frontiers.insert(
1378 export.id,
1379 Antichain::from_elem(mz_repr::Timestamp::minimum()),
1380 );
1381 }
1382
1383 // This needs to be broadcast by one worker and go through the internal command
1384 // fabric, to ensure consistent ordering of dataflow rendering across all
1385 // workers.
1386 if self.timely_worker_index == 0 {
1387 self.internal_cmd_tx
1388 .send(InternalStorageCommand::RunSinkDataflow(
1389 export.id,
1390 export.description,
1391 ));
1392 }
1393 }
1394 StorageCommand::AllowCompaction(id, frontier) => {
1395 match self.exports.get_mut(&id) {
1396 Some(export_description) => {
1397 // Update our knowledge of the `as_of`, in case we need to internally
1398 // restart a sink in the future.
1399 export_description.as_of.clone_from(&frontier);
1400 }
1401 // reported_frontiers contains both ingestions and their
1402 // exports
1403 None if self.reported_frontiers.contains_key(&id) => (),
1404 None => {
1405 soft_panic_or_log!("AllowCompaction command for non-existent {id}");
1406 }
1407 }
1408
1409 if frontier.is_empty() {
1410 // Indicates that we may drop `id`, as there are no more valid times to read.
1411 self.drop_collection(id);
1412 }
1413 }
1414 }
1415 }
1416
1417 /// Drop the identified storage collection from the storage state.
1418 fn drop_collection(&mut self, id: GlobalId) {
1419 fail_point!("crash_on_drop");
1420
1421 self.ingestions.remove(&id);
1422 self.exports.remove(&id);
1423
1424 let _ = self.latest_status_updates.remove(&id);
1425
1426 // This will stop reporting of frontiers.
1427 //
1428 // If this object still has its frontiers reported, we will notify the
1429 // client envd of the drop.
1430 if self.reported_frontiers.remove(&id).is_some() {
1431 // The only actions left are internal cleanup, so we can commit to
1432 // the client that these objects have been dropped.
1433 //
1434 // This must be done now rather than in response to `DropDataflow`,
1435 // otherwise we introduce the possibility of a timing issue where:
1436 // - We remove all tracking state from the storage state and send
1437 // `DropDataflow` (i.e. this block).
1438 // - While waiting to process that command, we reconcile with a new
1439 // envd. That envd has already committed to its catalog that this
1440 // object no longer exists.
1441 // - We process the `DropDataflow` command, and identify that this
1442 // object has been dropped.
1443 // - The next time `dropped_ids` is processed, we send a response
1444 // that this ID has been dropped, but the upstream state has no
1445 // record of that object having ever existed.
1446 self.dropped_ids.push(id);
1447 }
1448
1449 // Send through async worker for correct ordering with RunIngestion, and
1450 // dropping the dataflow is done on async worker response.
1451 if self.timely_worker_index == 0 {
1452 self.async_worker.drop_dataflow(id);
1453 }
1454 }
1455
1456 /// Drop the identified oneshot ingestion from the storage state.
1457 fn drop_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) {
1458 let prev = self.oneshot_ingestions.remove(&ingestion_id);
1459 tracing::info!(%ingestion_id, existed = %prev.is_some(), "dropping oneshot ingestion");
1460 }
1461}