mz_storage/storage_state.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for storage timely instances.
7//!
8//! One instance of a [`Worker`], along with its contained [`StorageState`], is
9//! part of an ensemble of storage workers that all run inside the same timely
10//! cluster. We call this worker a _storage worker_ to disambiguate it from
11//! other kinds of workers, potentially other components that might be sharing
12//! the same timely cluster.
13//!
14//! ## Controller and internal communication
15//!
16//! A worker receives _external_ [`StorageCommands`](StorageCommand) from the
17//! storage controller, via a channel. Storage workers also share an _internal_
18//! control/command fabric ([`internal_control`]). Internal commands go through
19//! a sequencer dataflow that ensures that all workers receive all commands in
20//! the same consistent order.
21//!
22//! We need to make sure that commands that cause dataflows to be rendered are
23//! processed in the same consistent order across all workers because timely
24//! requires this. To achieve this, we make sure that only internal commands can
25//! cause dataflows to be rendered. External commands (from the controller)
26//! cause internal commands to be broadcast (by only one worker), to get
27//! dataflows rendered.
28//!
29//! The internal command fabric is also used to broadcast messages from a local
30//! operator/worker to all workers. For example, when we need to tear down and
31//! restart a dataflow on all workers when an error is encountered.
32//!
33//! ## Async Storage Worker
34//!
35//! The storage worker has a companion [`AsyncStorageWorker`] that must be used
36//! when running code that requires `async`. This is needed because a timely
37//! main loop cannot run `async` code.
38//!
39//! ## Example flow of commands for `RunIngestion`
40//!
41//! With external commands, internal commands, and the async worker,
42//! understanding where and how commands from the controller are realized can
43//! get complicated. We will follow the complete flow for `RunIngestion`, as an
44//! example:
45//!
46//! 1. Worker receives a [`StorageCommand::RunIngestion`] command from the
47//! controller.
48//! 2. This command is processed in [`StorageState::handle_storage_command`].
49//! This step cannot render dataflows, because it does not have access to the
50//! timely worker. It will only set up state that stays over the whole
51//! lifetime of the source, such as the `reported_frontier`. Putting in place
52//! this reported frontier will enable frontier reporting for that source. We
53//! will not start reporting when we only see an internal command for
54//! rendering a dataflow, which can "overtake" the external `RunIngestion`
55//! command.
56//! 3. During processing of that command, we call
57//! [`AsyncStorageWorker::update_ingestion_frontiers`], which causes a command to
58//! be sent to the async worker.
59//! 4. We eventually get a response from the async worker:
60//! [`AsyncStorageWorkerResponse::IngestionFrontiersUpdated`].
61//! 5. This response is handled in [`Worker::handle_async_worker_response`].
62//! 6. Handling that response causes a
63//! [`InternalStorageCommand::CreateIngestionDataflow`] to be broadcast to
64//! all workers via the internal command fabric.
65//! 7. This message will be processed (on each worker) in
66//! [`Worker::handle_internal_storage_command`]. This is what will cause the
67//! required dataflow to be rendered on all workers.
68//!
69//! The process described above assumes that the `RunIngestion` is _not_ an
70//! update, i.e. it is in response to a `CREATE SOURCE`-like statement.
71//!
72//! The primary distinction when handling a `RunIngestion` that represents an
73//! update, is that it might fill out new internal state in the mid-level
74//! clients on the way toward being run.
75
76use std::cell::RefCell;
77use std::collections::{BTreeMap, BTreeSet, VecDeque};
78use std::path::PathBuf;
79use std::rc::Rc;
80use std::sync::Arc;
81use std::thread;
82use std::time::Duration;
83
84use fail::fail_point;
85use mz_ore::now::NowFn;
86use mz_ore::soft_assert_or_log;
87use mz_ore::tracing::TracingHandle;
88use mz_persist_client::batch::ProtoBatch;
89use mz_persist_client::cache::PersistClientCache;
90use mz_persist_client::operators::shard_source::ErrorHandler;
91use mz_repr::{GlobalId, Timestamp};
92use mz_rocksdb::config::SharedWriteBufferManager;
93use mz_storage_client::client::{
94 RunIngestionCommand, StatusUpdate, StorageCommand, StorageResponse,
95};
96use mz_storage_types::AlterCompatible;
97use mz_storage_types::configuration::StorageConfiguration;
98use mz_storage_types::connections::ConnectionContext;
99use mz_storage_types::controller::CollectionMetadata;
100use mz_storage_types::dyncfgs::STORAGE_SERVER_MAINTENANCE_INTERVAL;
101use mz_storage_types::oneshot_sources::OneshotIngestionDescription;
102use mz_storage_types::sinks::StorageSinkDesc;
103use mz_storage_types::sources::IngestionDescription;
104use mz_timely_util::builder_async::PressOnDropButton;
105use mz_txn_wal::operator::TxnsContext;
106use timely::order::PartialOrder;
107use timely::progress::Timestamp as _;
108use timely::progress::frontier::Antichain;
109use timely::worker::Worker as TimelyWorker;
110use tokio::sync::mpsc::error::TryRecvError;
111use tokio::sync::{mpsc, watch};
112use tokio::time::Instant;
113use tracing::{debug, info, warn};
114use uuid::Uuid;
115
116use crate::internal_control::{
117 self, DataflowParameters, InternalCommandReceiver, InternalCommandSender,
118 InternalStorageCommand,
119};
120use crate::metrics::StorageMetrics;
121use crate::statistics::{AggregatedStatistics, SinkStatistics, SourceStatistics};
122use crate::storage_state::async_storage_worker::{AsyncStorageWorker, AsyncStorageWorkerResponse};
123
124pub mod async_storage_worker;
125
126type CommandReceiver = mpsc::UnboundedReceiver<StorageCommand>;
127type ResponseSender = mpsc::UnboundedSender<StorageResponse>;
128
129/// State maintained for each worker thread.
130///
131/// Much of this state can be viewed as local variables for the worker thread,
132/// holding state that persists across function calls.
133pub struct Worker<'w> {
134 /// The underlying Timely worker.
135 ///
136 /// NOTE: This is `pub` for testing.
137 pub timely_worker: &'w mut TimelyWorker,
138 /// The channel over which communication handles for newly connected clients
139 /// are delivered.
140 pub client_rx: mpsc::UnboundedReceiver<(Uuid, CommandReceiver, ResponseSender)>,
141 /// The state associated with collection ingress and egress.
142 pub storage_state: StorageState,
143}
144
145impl<'w> Worker<'w> {
146 /// Creates new `Worker` state from the given components.
147 pub fn new(
148 timely_worker: &'w mut TimelyWorker,
149 client_rx: mpsc::UnboundedReceiver<(Uuid, CommandReceiver, ResponseSender)>,
150 metrics: StorageMetrics,
151 now: NowFn,
152 connection_context: ConnectionContext,
153 instance_context: StorageInstanceContext,
154 persist_clients: Arc<PersistClientCache>,
155 txns_ctx: TxnsContext,
156 tracing_handle: Arc<TracingHandle>,
157 shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
158 ) -> Self {
159 // It is very important that we only create the internal control
160 // flow/command sequencer once because a) the worker state is re-used
161 // when a new client connects and b) dataflows that have already been
162 // rendered into the timely worker are reused as well.
163 //
164 // If we created a new sequencer every time we get a new client (likely
165 // because the controller re-started and re-connected), dataflows that
166 // were rendered before would still hold a handle to the old sequencer
167 // but we would not read their commands anymore.
168 let (internal_cmd_tx, internal_cmd_rx) =
169 internal_control::setup_command_sequencer(timely_worker);
170
171 let storage_configuration =
172 StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs());
173
174 // We always initialize as read_only=true. Only when we're explicitly
175 // allowed do we switch to doing writes.
176 let (read_only_tx, read_only_rx) = watch::channel(true);
177
178 // Similar to the internal command sequencer, it is very important that
179 // we only create the async worker once because a) the worker state is
180 // re-used when a new client connects and b) commands that have already
181 // been sent and might yield a response will be lost if a new iteration
182 // of `run_client` creates a new async worker.
183 //
184 // If we created a new async worker every time we get a new client
185 // (likely because the controller re-started and re-connected), we can
186 // get into an inconsistent state where we think that a dataflow has
187 // been rendered, for example because there is an entry in
188 // `StorageState::ingestions`, while there is not yet a dataflow. This
189 // happens because the dataflow only gets rendered once we get a
190 // response from the async worker and send off an internal command.
191 //
192 // The core idea is that both the sequencer and the async worker are
193 // part of the per-worker state, and must be treated as such, meaning
194 // they must survive between invocations of `run_client`.
195
196 // TODO(aljoscha): This thread unparking business seems brittle, but that's
197 // also how the command channel works currently. We can wrap it inside a
198 // struct that holds both a channel and a `Thread`, but I don't
199 // think that would help too much.
200 let async_worker = async_storage_worker::AsyncStorageWorker::new(
201 thread::current(),
202 Arc::clone(&persist_clients),
203 );
204 let cluster_memory_limit = instance_context.cluster_memory_limit;
205
206 let storage_state = StorageState {
207 source_uppers: BTreeMap::new(),
208 source_tokens: BTreeMap::new(),
209 metrics,
210 reported_frontiers: BTreeMap::new(),
211 ingestions: BTreeMap::new(),
212 exports: BTreeMap::new(),
213 oneshot_ingestions: BTreeMap::new(),
214 now,
215 timely_worker_index: timely_worker.index(),
216 timely_worker_peers: timely_worker.peers(),
217 instance_context,
218 persist_clients,
219 txns_ctx,
220 sink_tokens: BTreeMap::new(),
221 sink_write_frontiers: BTreeMap::new(),
222 dropped_ids: Vec::new(),
223 aggregated_statistics: AggregatedStatistics::new(
224 timely_worker.index(),
225 timely_worker.peers(),
226 ),
227 shared_status_updates: Default::default(),
228 latest_status_updates: Default::default(),
229 initial_status_reported: Default::default(),
230 internal_cmd_tx,
231 internal_cmd_rx,
232 read_only_tx,
233 read_only_rx,
234 async_worker,
235 storage_configuration,
236 dataflow_parameters: DataflowParameters::new(
237 shared_rocksdb_write_buffer_manager,
238 cluster_memory_limit,
239 ),
240 tracing_handle,
241 server_maintenance_interval: Duration::ZERO,
242 };
243
244 // TODO(aljoscha): We might want `async_worker` and `internal_cmd_tx` to
245 // be fields of `Worker` instead of `StorageState`, but at least for the
246 // command flow sources and sinks need access to that. We can refactor
247 // this once we have a clearer boundary between what sources/sinks need
248 // and the full "power" of the internal command flow, which should stay
249 // internal to the worker/not be exposed to source/sink implementations.
250 Self {
251 timely_worker,
252 client_rx,
253 storage_state,
254 }
255 }
256}
257
258/// Worker-local state related to the ingress or egress of collections of data.
259pub struct StorageState {
260 /// The highest observed upper frontier for collection.
261 ///
262 /// This is shared among all source instances, so that they can jointly advance the
263 /// frontier even as other instances are created and dropped. Ideally, the Storage
264 /// module would eventually provide one source of truth on this rather than multiple,
265 /// and we should aim for that but are not there yet.
266 pub source_uppers: BTreeMap<GlobalId, Rc<RefCell<Antichain<mz_repr::Timestamp>>>>,
267 /// Handles to created sources, keyed by ID
268 /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
269 /// to prevent usage of custom shutdown tokens that are tricky to get right.
270 pub source_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
271 /// Metrics for storage objects.
272 pub metrics: StorageMetrics,
273 /// Tracks the conditional write frontiers we have reported.
274 pub reported_frontiers: BTreeMap<GlobalId, Antichain<Timestamp>>,
275 /// Descriptions of each installed ingestion.
276 pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
277 /// Descriptions of each installed export.
278 pub exports: BTreeMap<GlobalId, StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>,
279 /// Descriptions of oneshot ingestions that are currently running.
280 pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
281 /// Undocumented
282 pub now: NowFn,
283 /// Index of the associated timely dataflow worker.
284 pub timely_worker_index: usize,
285 /// Peers in the associated timely dataflow worker.
286 pub timely_worker_peers: usize,
287 /// Other configuration for sources and sinks.
288 pub instance_context: StorageInstanceContext,
289 /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
290 /// This is intentionally shared between workers
291 pub persist_clients: Arc<PersistClientCache>,
292 /// Context necessary for rendering txn-wal operators.
293 pub txns_ctx: TxnsContext,
294 /// Tokens that should be dropped when a dataflow is dropped to clean up
295 /// associated state.
296 /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
297 /// to prevent usage of custom shutdown tokens that are tricky to get right.
298 pub sink_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
299 /// Frontier of sink writes (all subsequent writes will be at times at or
300 /// equal to this frontier)
301 pub sink_write_frontiers: BTreeMap<GlobalId, Rc<RefCell<Antichain<Timestamp>>>>,
302 /// Collection ids that have been dropped but not yet reported as dropped
303 pub dropped_ids: Vec<GlobalId>,
304
305 /// Statistics for sources and sinks.
306 pub aggregated_statistics: AggregatedStatistics,
307
308 /// A place shared with running dataflows, so that health operators, can
309 /// report status updates back to us.
310 ///
311 /// **NOTE**: Operators that append to this collection should take care to only add new
312 /// status updates if the status of the ingestion/export in question has _changed_.
313 pub shared_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,
314
315 /// The latest status update for each object.
316 pub latest_status_updates: BTreeMap<GlobalId, StatusUpdate>,
317
318 /// Whether we have reported the initial status after connecting to a new client.
319 /// This is reset to false when a new client connects.
320 pub initial_status_reported: bool,
321
322 /// Sender for cluster-internal storage commands. These can be sent from
323 /// within workers/operators and will be distributed to all workers. For
324 /// example, for shutting down an entire dataflow from within a
325 /// operator/worker.
326 pub internal_cmd_tx: InternalCommandSender,
327 /// Receiver for cluster-internal storage commands.
328 pub internal_cmd_rx: InternalCommandReceiver,
329
330 /// When this replica/cluster is in read-only mode it must not affect any
331 /// changes to external state. This flag can only be changed by a
332 /// [StorageCommand::AllowWrites].
333 ///
334 /// Everything running on this replica/cluster must obey this flag. At the
335 /// time of writing, nothing currently looks at this flag.
336 /// TODO(benesch): fix this.
337 ///
338 /// NOTE: In the future, we might want a more complicated flag, for example
339 /// something that tells us after which timestamp we are allowed to write.
340 /// In this first version we are keeping things as simple as possible!
341 pub read_only_rx: watch::Receiver<bool>,
342
343 /// Send-side for read-only state.
344 pub read_only_tx: watch::Sender<bool>,
345
346 /// Async worker companion, used for running code that requires async, which
347 /// the timely main loop cannot do.
348 pub async_worker: AsyncStorageWorker<mz_repr::Timestamp>,
349
350 /// Configuration for source and sink connections.
351 pub storage_configuration: StorageConfiguration,
352 /// Dynamically configurable parameters that control how dataflows are rendered.
353 /// NOTE(guswynn): we should consider moving these into `storage_configuration`.
354 pub dataflow_parameters: DataflowParameters,
355
356 /// A process-global handle to tracing configuration.
357 pub tracing_handle: Arc<TracingHandle>,
358
359 /// Interval at which to perform server maintenance tasks. Set to a zero interval to
360 /// perform maintenance with every `step_or_park` invocation.
361 pub server_maintenance_interval: Duration,
362}
363
364impl StorageState {
365 /// Return an error handler that triggers a suspend and restart of the corresponding storage
366 /// dataflow.
367 pub fn error_handler(&self, context: &'static str, id: GlobalId) -> ErrorHandler {
368 let tx = self.internal_cmd_tx.clone();
369 ErrorHandler::signal(move |e| {
370 tx.send(InternalStorageCommand::SuspendAndRestart {
371 id,
372 reason: format!("{context}: {e:#}"),
373 })
374 })
375 }
376}
377
378/// Extra context for a storage instance.
379/// This is extra information that is used when rendering source
380/// and sinks that is not tied to the source/connection configuration itself.
381#[derive(Clone)]
382pub struct StorageInstanceContext {
383 /// A directory that can be used for scratch work.
384 pub scratch_directory: Option<PathBuf>,
385 /// A global `rocksdb::Env`, shared across ALL instances of `RocksDB` (even
386 /// across sources!). This `Env` lets us control some resources (like background threads)
387 /// process-wide.
388 pub rocksdb_env: rocksdb::Env,
389 /// The memory limit of the materialize cluster replica. This will
390 /// be used to calculate and configure the maximum inflight bytes for backpressure
391 pub cluster_memory_limit: Option<usize>,
392}
393
394impl StorageInstanceContext {
395 /// Build a new `StorageInstanceContext`.
396 pub fn new(
397 scratch_directory: Option<PathBuf>,
398 cluster_memory_limit: Option<usize>,
399 ) -> Result<Self, anyhow::Error> {
400 // If no file system is available, fall back to running RocksDB in memory.
401 let rocksdb_env = if scratch_directory.is_some() {
402 rocksdb::Env::new()?
403 } else {
404 rocksdb::Env::mem_env()?
405 };
406
407 Ok(Self {
408 scratch_directory,
409 rocksdb_env,
410 cluster_memory_limit,
411 })
412 }
413}
414
415impl<'w> Worker<'w> {
416 /// Waits for client connections and runs them to completion.
417 pub fn run(&mut self) {
418 while let Some((_nonce, rx, tx)) = self.client_rx.blocking_recv() {
419 self.run_client(rx, tx);
420 }
421 }
422
423 /// Runs this (timely) storage worker until the given `command_rx` is
424 /// disconnected.
425 ///
426 /// See the [module documentation](crate::storage_state) for this
427 /// workers responsibilities, how it communicates with the other workers and
428 /// how commands flow from the controller and through the workers.
429 fn run_client(&mut self, mut command_rx: CommandReceiver, response_tx: ResponseSender) {
430 // At this point, all workers are still reading from the command flow.
431 if self.reconcile(&mut command_rx).is_err() {
432 return;
433 }
434
435 // The last time we reported statistics.
436 let mut last_stats_time = Instant::now();
437
438 // The last time we did periodic maintenance.
439 let mut last_maintenance = std::time::Instant::now();
440
441 let mut disconnected = false;
442 while !disconnected {
443 let config = &self.storage_state.storage_configuration;
444 let stats_interval = config.parameters.statistics_collection_interval;
445
446 let maintenance_interval = self.storage_state.server_maintenance_interval;
447
448 let now = std::time::Instant::now();
449 // Determine if we need to perform maintenance, which is true if `maintenance_interval`
450 // time has passed since the last maintenance.
451 let sleep_duration;
452 if now >= last_maintenance + maintenance_interval {
453 last_maintenance = now;
454 sleep_duration = None;
455
456 self.report_frontier_progress(&response_tx);
457 } else {
458 // We didn't perform maintenance, sleep until the next maintenance interval.
459 let next_maintenance = last_maintenance + maintenance_interval;
460 sleep_duration = Some(next_maintenance.saturating_duration_since(now))
461 }
462
463 // Ask Timely to execute a unit of work.
464 //
465 // If there are no pending commands or responses from the async
466 // worker, we ask Timely to park the thread if there's nothing to
467 // do. We rely on another thread unparking us when there's new work
468 // to be done, e.g., when sending a command or when new Kafka
469 // messages have arrived.
470 //
471 // It is critical that we allow Timely to park iff there are no
472 // pending commands or responses. The command may have already been
473 // consumed by the call to `client_rx.recv`. See:
474 // https://github.com/MaterializeInc/materialize/pull/13973#issuecomment-1200312212
475 if command_rx.is_empty() && self.storage_state.async_worker.is_empty() {
476 // Make sure we wake up again to report any pending statistics updates.
477 let mut park_duration = stats_interval.saturating_sub(last_stats_time.elapsed());
478 if let Some(sleep_duration) = sleep_duration {
479 park_duration = std::cmp::min(sleep_duration, park_duration);
480 }
481 self.timely_worker.step_or_park(Some(park_duration));
482 } else {
483 self.timely_worker.step();
484 }
485
486 // Rerport any dropped ids
487 for id in std::mem::take(&mut self.storage_state.dropped_ids) {
488 self.send_storage_response(&response_tx, StorageResponse::DroppedId(id));
489 }
490
491 self.process_oneshot_ingestions(&response_tx);
492
493 self.report_status_updates(&response_tx);
494
495 if last_stats_time.elapsed() >= stats_interval {
496 self.report_storage_statistics(&response_tx);
497 last_stats_time = Instant::now();
498 }
499
500 // Handle any received commands.
501 loop {
502 match command_rx.try_recv() {
503 Ok(cmd) => self.storage_state.handle_storage_command(cmd),
504 Err(TryRecvError::Empty) => break,
505 Err(TryRecvError::Disconnected) => {
506 disconnected = true;
507 break;
508 }
509 }
510 }
511
512 // Handle responses from the async worker.
513 while let Ok(response) = self.storage_state.async_worker.try_recv() {
514 self.handle_async_worker_response(response);
515 }
516
517 // Handle any received commands.
518 while let Some(command) = self.storage_state.internal_cmd_rx.try_recv() {
519 self.handle_internal_storage_command(command);
520 }
521 }
522 }
523
524 /// Entry point for applying a response from the async storage worker.
525 pub fn handle_async_worker_response(
526 &self,
527 async_response: AsyncStorageWorkerResponse<mz_repr::Timestamp>,
528 ) {
529 // NOTE: If we want to share the load of async processing we
530 // have to change `handle_storage_command` and change this
531 // assert.
532 assert_eq!(
533 self.timely_worker.index(),
534 0,
535 "only worker #0 is doing async processing"
536 );
537 match async_response {
538 AsyncStorageWorkerResponse::IngestionFrontiersUpdated {
539 id,
540 ingestion_description,
541 as_of,
542 resume_uppers,
543 source_resume_uppers,
544 } => {
545 self.storage_state.internal_cmd_tx.send(
546 InternalStorageCommand::CreateIngestionDataflow {
547 id,
548 ingestion_description,
549 as_of,
550 resume_uppers,
551 source_resume_uppers,
552 },
553 );
554 }
555 AsyncStorageWorkerResponse::ExportFrontiersUpdated { id, description } => {
556 self.storage_state
557 .internal_cmd_tx
558 .send(InternalStorageCommand::RunSinkDataflow(id, description));
559 }
560 AsyncStorageWorkerResponse::DropDataflow(id) => {
561 self.storage_state
562 .internal_cmd_tx
563 .send(InternalStorageCommand::DropDataflow(vec![id]));
564 }
565 }
566 }
567
568 /// Entry point for applying an internal storage command.
569 pub fn handle_internal_storage_command(&mut self, internal_cmd: InternalStorageCommand) {
570 match internal_cmd {
571 InternalStorageCommand::SuspendAndRestart { id, reason } => {
572 info!(
573 "worker {}/{} initiating suspend-and-restart for {id} because of: {reason}",
574 self.timely_worker.index(),
575 self.timely_worker.peers(),
576 );
577
578 let maybe_ingestion = self.storage_state.ingestions.get(&id).cloned();
579 if let Some(ingestion_description) = maybe_ingestion {
580 // Yank the token of the previously existing source dataflow.Note that this
581 // token also includes any source exports/subsources.
582 let maybe_token = self.storage_state.source_tokens.remove(&id);
583 if maybe_token.is_none() {
584 // Something has dropped the source. Make sure we don't
585 // accidentally re-create it.
586 return;
587 }
588
589 // This needs to be done by one worker, which will
590 // broadcasts a `CreateIngestionDataflow` command to all
591 // workers based on the response that contains the
592 // resumption upper.
593 //
594 // Doing this separately on each worker could lead to
595 // differing resume_uppers which might lead to all kinds of
596 // mayhem.
597 //
598 // TODO(aljoscha): If we ever become worried that this is
599 // putting undue pressure on worker 0 we can pick the
600 // designated worker for a source/sink based on `id.hash()`.
601 if self.timely_worker.index() == 0 {
602 for (id, _) in ingestion_description.source_exports.iter() {
603 self.storage_state
604 .aggregated_statistics
605 .advance_global_epoch(*id);
606 }
607 self.storage_state
608 .async_worker
609 .update_ingestion_frontiers(id, ingestion_description);
610 }
611
612 // Continue with other commands.
613 return;
614 }
615
616 let maybe_sink = self.storage_state.exports.get(&id).cloned();
617 if let Some(sink_description) = maybe_sink {
618 // Yank the token of the previously existing sink
619 // dataflow.
620 let maybe_token = self.storage_state.sink_tokens.remove(&id);
621
622 if maybe_token.is_none() {
623 // Something has dropped the sink. Make sure we don't
624 // accidentally re-create it.
625 return;
626 }
627
628 // This needs to be broadcast by one worker and go through
629 // the internal command fabric, to ensure consistent
630 // ordering of dataflow rendering across all workers.
631 if self.timely_worker.index() == 0 {
632 self.storage_state
633 .aggregated_statistics
634 .advance_global_epoch(id);
635 self.storage_state
636 .async_worker
637 .update_sink_frontiers(id, sink_description);
638 }
639
640 // Continue with other commands.
641 return;
642 }
643
644 if !self
645 .storage_state
646 .ingestions
647 .values()
648 .any(|v| v.source_exports.contains_key(&id))
649 {
650 // Our current approach to dropping a source results in a race between shard
651 // finalization (which happens in the controller) and dataflow shutdown (which
652 // happens in clusterd). If a source is created and dropped fast enough -or the
653 // two commands get sufficiently delayed- then it's possible to receive a
654 // SuspendAndRestart command for an unknown source. We cannot assert that this
655 // never happens but we log an error here to track how often this happens.
656 warn!(
657 "got InternalStorageCommand::SuspendAndRestart for something that is not a source or sink: {id}"
658 );
659 }
660 }
661 InternalStorageCommand::CreateIngestionDataflow {
662 id: ingestion_id,
663 mut ingestion_description,
664 as_of,
665 mut resume_uppers,
666 mut source_resume_uppers,
667 } => {
668 info!(
669 ?as_of,
670 ?resume_uppers,
671 "worker {}/{} trying to (re-)start ingestion {ingestion_id}",
672 self.timely_worker.index(),
673 self.timely_worker.peers(),
674 );
675
676 // We initialize statistics before we prune finished exports. We
677 // still want to export statistics for these, plus the rendering
678 // machinery will get confused if there are not at least
679 // statistics for the "main" source.
680 for (export_id, export) in ingestion_description.source_exports.iter() {
681 let resume_upper = resume_uppers[export_id].clone();
682 self.storage_state.aggregated_statistics.initialize_source(
683 *export_id,
684 ingestion_id,
685 resume_upper.clone(),
686 || {
687 SourceStatistics::new(
688 *export_id,
689 self.storage_state.timely_worker_index,
690 &self.storage_state.metrics.source_statistics,
691 ingestion_id,
692 &export.storage_metadata.data_shard,
693 export.data_config.envelope.clone(),
694 resume_upper,
695 )
696 },
697 );
698 }
699
700 let finished_exports: BTreeSet<GlobalId> = resume_uppers
701 .iter()
702 .filter(|(_, frontier)| frontier.is_empty())
703 .map(|(id, _)| *id)
704 .collect();
705
706 resume_uppers.retain(|id, _| !finished_exports.contains(id));
707 source_resume_uppers.retain(|id, _| !finished_exports.contains(id));
708 ingestion_description
709 .source_exports
710 .retain(|id, _| !finished_exports.contains(id));
711
712 for id in ingestion_description.collection_ids() {
713 // If there is already a shared upper, we re-use it, to make
714 // sure that parties that are already using the shared upper
715 // can continue doing so.
716 let source_upper = self
717 .storage_state
718 .source_uppers
719 .entry(id.clone())
720 .or_insert_with(|| {
721 Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())))
722 });
723
724 let mut source_upper = source_upper.borrow_mut();
725 if !source_upper.is_empty() {
726 source_upper.clear();
727 source_upper.insert(mz_repr::Timestamp::minimum());
728 }
729 }
730
731 // If all subsources of the source are finished, we can skip rendering entirely.
732 // Also, if `as_of` is empty, the dataflow has been finalized, so we can skip it as
733 // well.
734 //
735 // TODO(guswynn|petrosagg): this is a bit hacky, and is a consequence of storage state
736 // management being a bit of a mess. we should clean this up and remove weird if
737 // statements like this.
738 if resume_uppers.values().all(|frontier| frontier.is_empty()) || as_of.is_empty() {
739 info!(
740 ?resume_uppers,
741 ?as_of,
742 "worker {}/{} skipping building ingestion dataflow \
743 for {ingestion_id} because the ingestion is finished",
744 self.timely_worker.index(),
745 self.timely_worker.peers(),
746 );
747 return;
748 }
749
750 crate::render::build_ingestion_dataflow(
751 self.timely_worker,
752 &mut self.storage_state,
753 ingestion_id,
754 ingestion_description,
755 as_of,
756 resume_uppers,
757 source_resume_uppers,
758 );
759 }
760 InternalStorageCommand::RunOneshotIngestion {
761 ingestion_id,
762 collection_id,
763 collection_meta,
764 request,
765 } => {
766 crate::render::build_oneshot_ingestion_dataflow(
767 self.timely_worker,
768 &mut self.storage_state,
769 ingestion_id,
770 collection_id,
771 collection_meta,
772 request,
773 );
774 }
775 InternalStorageCommand::RunSinkDataflow(sink_id, sink_description) => {
776 info!(
777 "worker {}/{} trying to (re-)start sink {sink_id}",
778 self.timely_worker.index(),
779 self.timely_worker.peers(),
780 );
781
782 {
783 // If there is already a shared write frontier, we re-use it, to
784 // make sure that parties that are already using the shared
785 // frontier can continue doing so.
786 let sink_write_frontier = self
787 .storage_state
788 .sink_write_frontiers
789 .entry(sink_id.clone())
790 .or_insert_with(|| Rc::new(RefCell::new(Antichain::new())));
791
792 let mut sink_write_frontier = sink_write_frontier.borrow_mut();
793 sink_write_frontier.clear();
794 sink_write_frontier.insert(mz_repr::Timestamp::minimum());
795 }
796 self.storage_state
797 .aggregated_statistics
798 .initialize_sink(sink_id, || {
799 SinkStatistics::new(
800 sink_id,
801 self.storage_state.timely_worker_index,
802 &self.storage_state.metrics.sink_statistics,
803 )
804 });
805
806 crate::render::build_export_dataflow(
807 self.timely_worker,
808 &mut self.storage_state,
809 sink_id,
810 sink_description,
811 );
812 }
813 InternalStorageCommand::DropDataflow(ids) => {
814 for id in &ids {
815 // Clean up per-source / per-sink state.
816 self.storage_state.source_uppers.remove(id);
817 self.storage_state.source_tokens.remove(id);
818
819 self.storage_state.sink_tokens.remove(id);
820 self.storage_state.sink_write_frontiers.remove(id);
821
822 self.storage_state.aggregated_statistics.deinitialize(*id);
823 }
824 }
825 InternalStorageCommand::UpdateConfiguration { storage_parameters } => {
826 self.storage_state
827 .dataflow_parameters
828 .update(storage_parameters.clone());
829 self.storage_state
830 .storage_configuration
831 .update(storage_parameters);
832
833 // Clear out the updates as we no longer forward them to anyone else to process.
834 // We clone `StorageState::storage_configuration` many times during rendering
835 // and want to avoid cloning these unused updates.
836 self.storage_state
837 .storage_configuration
838 .parameters
839 .dyncfg_updates = Default::default();
840
841 // Remember the maintenance interval locally to avoid reading it from the config set on
842 // every server iteration.
843 self.storage_state.server_maintenance_interval =
844 STORAGE_SERVER_MAINTENANCE_INTERVAL
845 .get(self.storage_state.storage_configuration.config_set());
846
847 // Gate the upsert source-stash's use of the column pager. The
848 // pager's budget pool, backend, and codec are the shared ones
849 // configured by compute's `apply_worker_config` (compute and
850 // storage run in the same process); storage only decides whether
851 // its stash participates, via its own dyncfg.
852 {
853 use mz_storage_types::dyncfgs::ENABLE_UPSERT_PAGED_SPILL;
854
855 let enabled = ENABLE_UPSERT_PAGED_SPILL
856 .get(self.storage_state.storage_configuration.config_set());
857 crate::upsert::upsert_stash_pager::set_enabled(enabled);
858 }
859 }
860 InternalStorageCommand::StatisticsUpdate { sources, sinks } => self
861 .storage_state
862 .aggregated_statistics
863 .ingest(sources, sinks),
864 }
865 }
866
867 /// Emit information about write frontier progress, along with information that should
868 /// be made durable for this to be the case.
869 ///
870 /// The write frontier progress is "conditional" in that it is not until the information is made
871 /// durable that the data are emitted to downstream workers, and indeed they should not rely on
872 /// the completeness of what they hear until the information is made durable.
873 ///
874 /// Specifically, this sends information about new timestamp bindings created by dataflow workers,
875 /// with the understanding if that if made durable (and ack'd back to the workers) the source will
876 /// in fact progress with this write frontier.
877 pub fn report_frontier_progress(&mut self, response_tx: &ResponseSender) {
878 let mut new_uppers = Vec::new();
879
880 // Check if any observed frontier should advance the reported frontiers.
881 for (id, frontier) in self
882 .storage_state
883 .source_uppers
884 .iter()
885 .chain(self.storage_state.sink_write_frontiers.iter())
886 {
887 let Some(reported_frontier) = self.storage_state.reported_frontiers.get_mut(id) else {
888 // Frontier reporting has not yet been started for this object.
889 // Potentially because this timely worker has not yet seen the
890 // `CreateSources` command.
891 continue;
892 };
893
894 let observed_frontier = frontier.borrow();
895
896 // Only do a thing if it *advances* the frontier, not just *changes* the frontier.
897 // This is protection against `frontier` lagging behind what we have conditionally reported.
898 if PartialOrder::less_than(reported_frontier, &observed_frontier) {
899 new_uppers.push((*id, observed_frontier.clone()));
900 reported_frontier.clone_from(&observed_frontier);
901 }
902 }
903
904 for (id, upper) in new_uppers {
905 self.send_storage_response(response_tx, StorageResponse::FrontierUpper(id, upper));
906 }
907 }
908
909 /// Pumps latest status updates from the buffer shared with operators and
910 /// reports any updates that need reporting.
911 pub fn report_status_updates(&mut self, response_tx: &ResponseSender) {
912 // If we haven't done the initial status report, report all current statuses
913 if !self.storage_state.initial_status_reported {
914 // We pull initially reported status updates to "now", so that they
915 // sort as the latest update in internal status collections. This
916 // makes it so that a newly bootstrapped envd can append status
917 // updates to internal status collections that report an accurate
918 // view as of the time when they came up.
919 let now_ts = mz_ore::now::to_datetime((self.storage_state.now)());
920 let status_updates = self
921 .storage_state
922 .latest_status_updates
923 .values()
924 .cloned()
925 .map(|mut update| {
926 update.timestamp = now_ts.clone();
927 update
928 });
929 for update in status_updates {
930 self.send_storage_response(response_tx, StorageResponse::StatusUpdate(update));
931 }
932 self.storage_state.initial_status_reported = true;
933 }
934
935 // Pump updates into our state and stage them for reporting.
936 for shared_update in self.storage_state.shared_status_updates.take() {
937 self.send_storage_response(
938 response_tx,
939 StorageResponse::StatusUpdate(shared_update.clone()),
940 );
941
942 self.storage_state
943 .latest_status_updates
944 .insert(shared_update.id, shared_update);
945 }
946 }
947
948 /// Report source statistics back to the controller.
949 pub fn report_storage_statistics(&mut self, response_tx: &ResponseSender) {
950 let (sources, sinks) = self.storage_state.aggregated_statistics.emit_local();
951 if !sources.is_empty() || !sinks.is_empty() {
952 self.storage_state
953 .internal_cmd_tx
954 .send(InternalStorageCommand::StatisticsUpdate { sources, sinks })
955 }
956
957 let (sources, sinks) = self.storage_state.aggregated_statistics.snapshot();
958 if !sources.is_empty() || !sinks.is_empty() {
959 self.send_storage_response(
960 response_tx,
961 StorageResponse::StatisticsUpdates(sources, sinks),
962 );
963 }
964 }
965
966 /// Send a response to the coordinator.
967 fn send_storage_response(&self, response_tx: &ResponseSender, response: StorageResponse) {
968 // Ignore send errors because the coordinator is free to ignore our
969 // responses. This happens during shutdown.
970 let _ = response_tx.send(response);
971 }
972
973 fn process_oneshot_ingestions(&mut self, response_tx: &ResponseSender) {
974 for (ingestion_id, ingestion_state) in &mut self.storage_state.oneshot_ingestions {
975 loop {
976 match ingestion_state.results.try_recv() {
977 Ok(result) => {
978 let response = match result {
979 Ok(maybe_batch) => maybe_batch.into_iter().map(Result::Ok).collect(),
980 Err(err) => vec![Err(err)],
981 };
982 let staged_batches = BTreeMap::from([(*ingestion_id, response)]);
983 let _ = response_tx.send(StorageResponse::StagedBatches(staged_batches));
984 }
985 Err(TryRecvError::Empty) => {
986 break;
987 }
988 Err(TryRecvError::Disconnected) => {
989 break;
990 }
991 }
992 }
993 }
994 }
995
996 /// Extract commands until `InitializationComplete`, and make the worker
997 /// reflect those commands. If the worker can not be made to reflect the
998 /// commands, return an error.
999 fn reconcile(&mut self, command_rx: &mut CommandReceiver) -> Result<(), ()> {
1000 let worker_id = self.timely_worker.index();
1001
1002 // To initialize the connection, we want to drain all commands until we
1003 // receive a `StorageCommand::InitializationComplete` command to form a
1004 // target command state.
1005 let mut commands = vec![];
1006 loop {
1007 match command_rx.blocking_recv().ok_or(())? {
1008 StorageCommand::InitializationComplete => break,
1009 command => commands.push(command),
1010 }
1011 }
1012
1013 // Track which frontiers this envd expects; we will also set their
1014 // initial timestamp to the minimum timestamp to reset them as we don't
1015 // know what frontiers the new envd expects.
1016 let mut expected_objects = BTreeSet::new();
1017
1018 let mut drop_commands = BTreeSet::new();
1019 let mut running_ingestion_descriptions = self.storage_state.ingestions.clone();
1020 let mut running_exports_descriptions = self.storage_state.exports.clone();
1021
1022 let mut create_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1023 let mut cancel_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1024
1025 for command in &mut commands {
1026 match command {
1027 StorageCommand::Hello { .. } => {
1028 panic!("Hello must be captured before")
1029 }
1030 StorageCommand::AllowCompaction(id, since) => {
1031 info!(%worker_id, ?id, ?since, "reconcile: received AllowCompaction command");
1032
1033 // collect all "drop commands". These are `AllowCompaction`
1034 // commands that compact to the empty since. Then, later, we make sure
1035 // we retain only those `Create*` commands that are not dropped. We
1036 // assume that the `AllowCompaction` command is ordered after the
1037 // `Create*` commands but don't assert that.
1038 // WIP: Should we assert?
1039 if since.is_empty() {
1040 drop_commands.insert(*id);
1041 }
1042 }
1043 StorageCommand::RunIngestion(ingestion) => {
1044 info!(%worker_id, ?ingestion, "reconcile: received RunIngestion command");
1045
1046 // Ensure that ingestions are forward-rolling alter compatible.
1047 let prev = running_ingestion_descriptions
1048 .insert(ingestion.id, ingestion.description.clone());
1049
1050 if let Some(prev_ingest) = prev {
1051 // If the new ingestion is not exactly equal to the currently running
1052 // ingestion, we must either track that we need to synthesize an update
1053 // command to change the ingestion, or panic.
1054 prev_ingest
1055 .alter_compatible(ingestion.id, &ingestion.description)
1056 .expect("only alter compatible ingestions permitted");
1057 }
1058 }
1059 StorageCommand::RunSink(export) => {
1060 info!(%worker_id, ?export, "reconcile: received RunSink command");
1061
1062 // Ensure that exports are forward-rolling alter compatible.
1063 let prev =
1064 running_exports_descriptions.insert(export.id, export.description.clone());
1065
1066 if let Some(prev_export) = prev {
1067 prev_export
1068 .alter_compatible(export.id, &export.description)
1069 .expect("only alter compatible exports permitted");
1070 }
1071 }
1072 StorageCommand::RunOneshotIngestion(ingestion) => {
1073 info!(%worker_id, ?ingestion, "reconcile: received RunOneshotIngestion command");
1074 create_oneshot_ingestions.insert(ingestion.ingestion_id);
1075 }
1076 StorageCommand::CancelOneshotIngestion(uuid) => {
1077 info!(%worker_id, %uuid, "reconcile: received CancelOneshotIngestion command");
1078 cancel_oneshot_ingestions.insert(*uuid);
1079 }
1080 StorageCommand::InitializationComplete
1081 | StorageCommand::AllowWrites
1082 | StorageCommand::UpdateConfiguration(_) => (),
1083 }
1084 }
1085
1086 let mut seen_most_recent_definition = BTreeSet::new();
1087
1088 // We iterate over this backward to ensure that we keep only the most recent ingestion
1089 // description.
1090 let mut filtered_commands = VecDeque::new();
1091 for mut command in commands.into_iter().rev() {
1092 let mut should_keep = true;
1093 match &mut command {
1094 StorageCommand::Hello { .. } => {
1095 panic!("Hello must be captured before")
1096 }
1097 StorageCommand::RunIngestion(ingestion) => {
1098 // Subsources can be dropped independently of their
1099 // primary source, so we evaluate them in a separate
1100 // loop.
1101 for export_id in ingestion
1102 .description
1103 .source_exports
1104 .keys()
1105 .filter(|export_id| **export_id != ingestion.id)
1106 {
1107 if drop_commands.remove(export_id) {
1108 info!(%worker_id, %export_id, "reconcile: dropping subsource");
1109 self.storage_state.dropped_ids.push(*export_id);
1110 }
1111 }
1112
1113 if drop_commands.remove(&ingestion.id)
1114 || self.storage_state.dropped_ids.contains(&ingestion.id)
1115 {
1116 info!(%worker_id, %ingestion.id, "reconcile: dropping ingestion");
1117
1118 // If an ingestion is dropped, so too must all of
1119 // its subsources (i.e. ingestion exports, as well
1120 // as its progress subsource).
1121 for id in ingestion.description.collection_ids() {
1122 drop_commands.remove(&id);
1123 self.storage_state.dropped_ids.push(id);
1124 }
1125 should_keep = false;
1126 } else {
1127 let most_recent_defintion =
1128 seen_most_recent_definition.insert(ingestion.id);
1129
1130 if most_recent_defintion {
1131 // If this is the most recent definition, this
1132 // is what we will be running when
1133 // reconciliation completes. This definition
1134 // must not include any dropped subsources.
1135 ingestion.description.source_exports.retain(|export_id, _| {
1136 !self.storage_state.dropped_ids.contains(export_id)
1137 });
1138
1139 // After clearing any dropped subsources, we can
1140 // state that we expect all of these to exist.
1141 expected_objects.extend(ingestion.description.collection_ids());
1142 }
1143
1144 let running_ingestion = self.storage_state.ingestions.get(&ingestion.id);
1145
1146 // We keep only:
1147 // - The most recent version of the ingestion, which
1148 // is why these commands are run in reverse.
1149 // - Ingestions whose descriptions are not exactly
1150 // those that are currently running.
1151 should_keep = most_recent_defintion
1152 && running_ingestion != Some(&ingestion.description)
1153 }
1154 }
1155 StorageCommand::RunSink(export) => {
1156 if drop_commands.remove(&export.id)
1157 // If there were multiple `RunSink` in the command
1158 // stream, we want to ensure none of them are
1159 // retained.
1160 || self.storage_state.dropped_ids.contains(&export.id)
1161 {
1162 info!(%worker_id, %export.id, "reconcile: dropping sink");
1163
1164 // Make sure that we report back that the ID was
1165 // dropped.
1166 self.storage_state.dropped_ids.push(export.id);
1167
1168 should_keep = false
1169 } else {
1170 expected_objects.insert(export.id);
1171
1172 let running_sink = self.storage_state.exports.get(&export.id);
1173
1174 // We keep only:
1175 // - The most recent version of the sink, which
1176 // is why these commands are run in reverse.
1177 // - Sinks whose descriptions are not exactly
1178 // those that are currently running.
1179 should_keep = seen_most_recent_definition.insert(export.id)
1180 && running_sink != Some(&export.description);
1181 }
1182 }
1183 StorageCommand::RunOneshotIngestion(ingestion) => {
1184 let already_running = self
1185 .storage_state
1186 .oneshot_ingestions
1187 .contains_key(&ingestion.ingestion_id);
1188 let was_canceled = cancel_oneshot_ingestions.contains(&ingestion.ingestion_id);
1189
1190 should_keep = !already_running && !was_canceled;
1191 }
1192 StorageCommand::CancelOneshotIngestion(ingestion_id) => {
1193 let already_running = self
1194 .storage_state
1195 .oneshot_ingestions
1196 .contains_key(ingestion_id);
1197 should_keep = already_running;
1198 }
1199 StorageCommand::InitializationComplete
1200 | StorageCommand::AllowWrites
1201 | StorageCommand::UpdateConfiguration(_)
1202 | StorageCommand::AllowCompaction(_, _) => (),
1203 }
1204 if should_keep {
1205 filtered_commands.push_front(command);
1206 }
1207 }
1208 let commands = filtered_commands;
1209
1210 // Make sure all the "drop commands" matched up with a source or sink.
1211 // This is also what the regular handler logic for `AllowCompaction`
1212 // would do.
1213 soft_assert_or_log!(
1214 drop_commands.is_empty(),
1215 "AllowCompaction commands for non-existent IDs {:?}",
1216 drop_commands
1217 );
1218
1219 // Determine the ID of all objects we did _not_ see; these are
1220 // considered stale.
1221 let stale_objects = self
1222 .storage_state
1223 .ingestions
1224 .values()
1225 .map(|i| i.collection_ids())
1226 .flatten()
1227 .chain(self.storage_state.exports.keys().copied())
1228 // Objects are considered stale if we did not see them re-created.
1229 .filter(|id| !expected_objects.contains(id))
1230 .collect::<Vec<_>>();
1231 let stale_oneshot_ingestions = self
1232 .storage_state
1233 .oneshot_ingestions
1234 .keys()
1235 .filter(|ingestion_id| {
1236 let to_create = create_oneshot_ingestions.contains(ingestion_id);
1237 let to_drop = cancel_oneshot_ingestions.contains(ingestion_id);
1238 mz_ore::soft_assert_or_log!(
1239 !(!to_create && to_drop),
1240 "attempting to drop oneshot source {ingestion_id} that is not expected to be created during reconciliation"
1241 );
1242 !to_create && !to_drop
1243 })
1244 .copied()
1245 .collect::<Vec<_>>();
1246
1247 info!(
1248 %worker_id, ?expected_objects, ?stale_objects, ?stale_oneshot_ingestions,
1249 "reconcile: modifing storage state to match expected objects",
1250 );
1251
1252 for id in stale_objects {
1253 self.storage_state.drop_collection(id);
1254 }
1255 for id in stale_oneshot_ingestions {
1256 self.storage_state.drop_oneshot_ingestion(id);
1257 }
1258
1259 // Do not report dropping any objects that do not belong to expected
1260 // objects.
1261 self.storage_state
1262 .dropped_ids
1263 .retain(|id| expected_objects.contains(id));
1264
1265 // Do not report any frontiers that do not belong to expected objects.
1266 // Note that this set of objects can differ from the set of sources and
1267 // sinks.
1268 self.storage_state
1269 .reported_frontiers
1270 .retain(|id, _| expected_objects.contains(id));
1271
1272 // Reset the reported frontiers for the remaining objects.
1273 for (_, frontier) in &mut self.storage_state.reported_frontiers {
1274 *frontier = Antichain::from_elem(<_>::minimum());
1275 }
1276
1277 // Reset the initial status reported flag when a new client connects
1278 self.storage_state.initial_status_reported = false;
1279
1280 // Execute the modified commands.
1281 for command in commands {
1282 self.storage_state.handle_storage_command(command);
1283 }
1284
1285 Ok(())
1286 }
1287}
1288
1289impl StorageState {
1290 /// Entry point for applying a storage command.
1291 ///
1292 /// NOTE: This does not have access to the timely worker and therefore
1293 /// cannot render dataflows. For dataflow rendering, this needs to either
1294 /// send asynchronous command to the `async_worker` or internal
1295 /// commands to the `internal_cmd_tx`.
1296 pub fn handle_storage_command(&mut self, cmd: StorageCommand) {
1297 match cmd {
1298 StorageCommand::Hello { .. } => panic!("Hello must be captured before"),
1299 StorageCommand::InitializationComplete => (),
1300 StorageCommand::AllowWrites => {
1301 self.read_only_tx
1302 .send(false)
1303 .expect("we're holding one other end");
1304 self.persist_clients.cfg().enable_compaction();
1305 }
1306 StorageCommand::UpdateConfiguration(params) => {
1307 // These can be done from all workers safely.
1308 debug!("Applying configuration update: {params:?}");
1309
1310 // We serialize the dyncfg updates in StorageParameters, but configure
1311 // persist separately.
1312 self.persist_clients
1313 .cfg()
1314 .apply_from(¶ms.dyncfg_updates);
1315
1316 params.tracing.apply(self.tracing_handle.as_ref());
1317
1318 if let Some(log_filter) = ¶ms.tracing.log_filter {
1319 self.storage_configuration
1320 .connection_context
1321 .librdkafka_log_level =
1322 mz_ore::tracing::crate_level(&log_filter.clone().into(), "librdkafka");
1323 }
1324
1325 // This needs to be broadcast by one worker and go through
1326 // the internal command fabric, to ensure consistent
1327 // ordering of dataflow rendering across all workers.
1328 if self.timely_worker_index == 0 {
1329 self.internal_cmd_tx
1330 .send(InternalStorageCommand::UpdateConfiguration {
1331 storage_parameters: *params,
1332 })
1333 }
1334 }
1335 StorageCommand::RunIngestion(ingestion) => {
1336 let RunIngestionCommand { id, description } = *ingestion;
1337
1338 // Remember the ingestion description to facilitate possible
1339 // reconciliation later.
1340 self.ingestions.insert(id, description.clone());
1341
1342 // Initialize shared frontier reporting.
1343 for id in description.collection_ids() {
1344 self.reported_frontiers
1345 .entry(id)
1346 .or_insert_with(|| Antichain::from_elem(mz_repr::Timestamp::minimum()));
1347 }
1348
1349 // This needs to be done by one worker, which will broadcasts a
1350 // `CreateIngestionDataflow` command to all workers based on the response that
1351 // contains the resumption upper.
1352 //
1353 // Doing this separately on each worker could lead to differing resume_uppers
1354 // which might lead to all kinds of mayhem.
1355 //
1356 // n.b. the ingestion on each worker uses the description from worker 0––not the
1357 // ingestion in the local storage state. This is something we might have
1358 // interest in fixing in the future, e.g. materialize#19907
1359 if self.timely_worker_index == 0 {
1360 self.async_worker
1361 .update_ingestion_frontiers(id, description);
1362 }
1363 }
1364 StorageCommand::RunOneshotIngestion(oneshot) => {
1365 if self.timely_worker_index == 0 {
1366 self.internal_cmd_tx
1367 .send(InternalStorageCommand::RunOneshotIngestion {
1368 ingestion_id: oneshot.ingestion_id,
1369 collection_id: oneshot.collection_id,
1370 collection_meta: oneshot.collection_meta,
1371 request: oneshot.request,
1372 });
1373 }
1374 }
1375 StorageCommand::CancelOneshotIngestion(id) => {
1376 self.drop_oneshot_ingestion(id);
1377 }
1378 StorageCommand::RunSink(export) => {
1379 // Remember the sink description to facilitate possible
1380 // reconciliation later.
1381 let prev = self.exports.insert(export.id, export.description.clone());
1382
1383 // New sink, add state.
1384 if prev.is_none() {
1385 self.reported_frontiers.insert(
1386 export.id,
1387 Antichain::from_elem(mz_repr::Timestamp::minimum()),
1388 );
1389 }
1390
1391 // This needs to be broadcast by one worker and go through the internal command
1392 // fabric, to ensure consistent ordering of dataflow rendering across all
1393 // workers.
1394 if self.timely_worker_index == 0 {
1395 self.internal_cmd_tx
1396 .send(InternalStorageCommand::RunSinkDataflow(
1397 export.id,
1398 export.description,
1399 ));
1400 }
1401 }
1402 StorageCommand::AllowCompaction(id, frontier) => {
1403 soft_assert_or_log!(
1404 self.exports.contains_key(&id) || self.reported_frontiers.contains_key(&id),
1405 "AllowCompaction command for non-existent {id}"
1406 );
1407
1408 if frontier.is_empty() {
1409 // Indicates that we may drop `id`, as there are no more valid times to read.
1410 self.drop_collection(id);
1411 }
1412 }
1413 }
1414 }
1415
1416 /// Drop the identified storage collection from the storage state.
1417 fn drop_collection(&mut self, id: GlobalId) {
1418 fail_point!("crash_on_drop");
1419
1420 self.ingestions.remove(&id);
1421 self.exports.remove(&id);
1422
1423 let _ = self.latest_status_updates.remove(&id);
1424
1425 // This will stop reporting of frontiers.
1426 //
1427 // If this object still has its frontiers reported, we will notify the
1428 // client envd of the drop.
1429 if self.reported_frontiers.remove(&id).is_some() {
1430 // The only actions left are internal cleanup, so we can commit to
1431 // the client that these objects have been dropped.
1432 //
1433 // This must be done now rather than in response to `DropDataflow`,
1434 // otherwise we introduce the possibility of a timing issue where:
1435 // - We remove all tracking state from the storage state and send
1436 // `DropDataflow` (i.e. this block).
1437 // - While waiting to process that command, we reconcile with a new
1438 // envd. That envd has already committed to its catalog that this
1439 // object no longer exists.
1440 // - We process the `DropDataflow` command, and identify that this
1441 // object has been dropped.
1442 // - The next time `dropped_ids` is processed, we send a response
1443 // that this ID has been dropped, but the upstream state has no
1444 // record of that object having ever existed.
1445 self.dropped_ids.push(id);
1446 }
1447
1448 // Send through async worker for correct ordering with RunIngestion, and
1449 // dropping the dataflow is done on async worker response.
1450 if self.timely_worker_index == 0 {
1451 self.async_worker.drop_dataflow(id);
1452 }
1453 }
1454
1455 /// Drop the identified oneshot ingestion from the storage state.
1456 fn drop_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) {
1457 let prev = self.oneshot_ingestions.remove(&ingestion_id);
1458 info!(%ingestion_id, existed = %prev.is_some(), "dropping oneshot ingestion");
1459 }
1460}