Skip to main content

mz_storage/
storage_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for storage timely instances.
7//!
8//! One instance of a [`Worker`], along with its contained [`StorageState`], is
9//! part of an ensemble of storage workers that all run inside the same timely
10//! cluster. We call this worker a _storage worker_ to disambiguate it from
11//! other kinds of workers, potentially other components that might be sharing
12//! the same timely cluster.
13//!
14//! ## Controller and internal communication
15//!
16//! A worker receives _external_ [`StorageCommands`](StorageCommand) from the
17//! storage controller, via a channel. Storage workers also share an _internal_
18//! control/command fabric ([`internal_control`]). Internal commands go through
19//! a sequencer dataflow that ensures that all workers receive all commands in
20//! the same consistent order.
21//!
22//! We need to make sure that commands that cause dataflows to be rendered are
23//! processed in the same consistent order across all workers because timely
24//! requires this. To achieve this, we make sure that only internal commands can
25//! cause dataflows to be rendered. External commands (from the controller)
26//! cause internal commands to be broadcast (by only one worker), to get
27//! dataflows rendered.
28//!
29//! The internal command fabric is also used to broadcast messages from a local
30//! operator/worker to all workers. For example, when we need to tear down and
31//! restart a dataflow on all workers when an error is encountered.
32//!
33//! ## Async Storage Worker
34//!
35//! The storage worker has a companion [`AsyncStorageWorker`] that must be used
36//! when running code that requires `async`. This is needed because a timely
37//! main loop cannot run `async` code.
38//!
39//! ## Example flow of commands for `RunIngestion`
40//!
41//! With external commands, internal commands, and the async worker,
42//! understanding where and how commands from the controller are realized can
43//! get complicated. We will follow the complete flow for `RunIngestion`, as an
44//! example:
45//!
46//! 1. Worker receives a [`StorageCommand::RunIngestion`] command from the
47//!    controller.
48//! 2. This command is processed in [`StorageState::handle_storage_command`].
49//!    This step cannot render dataflows, because it does not have access to the
50//!    timely worker. It will only set up state that stays over the whole
51//!    lifetime of the source, such as the `reported_frontier`. Putting in place
52//!    this reported frontier will enable frontier reporting for that source. We
53//!    will not start reporting when we only see an internal command for
54//!    rendering a dataflow, which can "overtake" the external `RunIngestion`
55//!    command.
56//! 3. During processing of that command, we call
57//!    [`AsyncStorageWorker::update_ingestion_frontiers`], which causes a command to
58//!    be sent to the async worker.
59//! 4. We eventually get a response from the async worker:
60//!    [`AsyncStorageWorkerResponse::IngestionFrontiersUpdated`].
61//! 5. This response is handled in [`Worker::handle_async_worker_response`].
62//! 6. Handling that response causes a
63//!    [`InternalStorageCommand::CreateIngestionDataflow`] to be broadcast to
64//!    all workers via the internal command fabric.
65//! 7. This message will be processed (on each worker) in
66//!    [`Worker::handle_internal_storage_command`]. This is what will cause the
67//!    required dataflow to be rendered on all workers.
68//!
69//! The process described above assumes that the `RunIngestion` is _not_ an
70//! update, i.e. it is in response to a `CREATE SOURCE`-like statement.
71//!
72//! The primary distinction when handling a `RunIngestion` that represents an
73//! update, is that it might fill out new internal state in the mid-level
74//! clients on the way toward being run.
75
76use std::cell::RefCell;
77use std::collections::{BTreeMap, BTreeSet, VecDeque};
78use std::path::PathBuf;
79use std::rc::Rc;
80use std::sync::Arc;
81use std::thread;
82use std::time::Duration;
83
84use fail::fail_point;
85use mz_ore::now::NowFn;
86use mz_ore::soft_assert_or_log;
87use mz_ore::tracing::TracingHandle;
88use mz_persist_client::batch::ProtoBatch;
89use mz_persist_client::cache::PersistClientCache;
90use mz_persist_client::operators::shard_source::ErrorHandler;
91use mz_repr::{GlobalId, Timestamp};
92use mz_rocksdb::config::SharedWriteBufferManager;
93use mz_storage_client::client::{
94    RunIngestionCommand, StatusUpdate, StorageCommand, StorageResponse,
95};
96use mz_storage_types::AlterCompatible;
97use mz_storage_types::configuration::StorageConfiguration;
98use mz_storage_types::connections::ConnectionContext;
99use mz_storage_types::controller::CollectionMetadata;
100use mz_storage_types::dyncfgs::STORAGE_SERVER_MAINTENANCE_INTERVAL;
101use mz_storage_types::oneshot_sources::OneshotIngestionDescription;
102use mz_storage_types::sinks::StorageSinkDesc;
103use mz_storage_types::sources::IngestionDescription;
104use mz_timely_util::builder_async::PressOnDropButton;
105use mz_txn_wal::operator::TxnsContext;
106use timely::order::PartialOrder;
107use timely::progress::Timestamp as _;
108use timely::progress::frontier::Antichain;
109use timely::worker::Worker as TimelyWorker;
110use tokio::sync::mpsc::error::TryRecvError;
111use tokio::sync::{mpsc, watch};
112use tokio::time::Instant;
113use tracing::{debug, info, warn};
114use uuid::Uuid;
115
116use crate::internal_control::{
117    self, DataflowParameters, InternalCommandReceiver, InternalCommandSender,
118    InternalStorageCommand,
119};
120use crate::metrics::StorageMetrics;
121use crate::statistics::{AggregatedStatistics, SinkStatistics, SourceStatistics};
122use crate::storage_state::async_storage_worker::{AsyncStorageWorker, AsyncStorageWorkerResponse};
123
124pub mod async_storage_worker;
125
126type CommandReceiver = mpsc::UnboundedReceiver<StorageCommand>;
127type ResponseSender = mpsc::UnboundedSender<StorageResponse>;
128
129/// State maintained for each worker thread.
130///
131/// Much of this state can be viewed as local variables for the worker thread,
132/// holding state that persists across function calls.
133pub struct Worker<'w> {
134    /// The underlying Timely worker.
135    ///
136    /// NOTE: This is `pub` for testing.
137    pub timely_worker: &'w mut TimelyWorker,
138    /// The channel over which communication handles for newly connected clients
139    /// are delivered.
140    pub client_rx: mpsc::UnboundedReceiver<(Uuid, CommandReceiver, ResponseSender)>,
141    /// The state associated with collection ingress and egress.
142    pub storage_state: StorageState,
143}
144
145impl<'w> Worker<'w> {
146    /// Creates new `Worker` state from the given components.
147    pub fn new(
148        timely_worker: &'w mut TimelyWorker,
149        client_rx: mpsc::UnboundedReceiver<(Uuid, CommandReceiver, ResponseSender)>,
150        metrics: StorageMetrics,
151        now: NowFn,
152        connection_context: ConnectionContext,
153        instance_context: StorageInstanceContext,
154        persist_clients: Arc<PersistClientCache>,
155        txns_ctx: TxnsContext,
156        tracing_handle: Arc<TracingHandle>,
157        shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
158    ) -> Self {
159        // It is very important that we only create the internal control
160        // flow/command sequencer once because a) the worker state is re-used
161        // when a new client connects and b) dataflows that have already been
162        // rendered into the timely worker are reused as well.
163        //
164        // If we created a new sequencer every time we get a new client (likely
165        // because the controller re-started and re-connected), dataflows that
166        // were rendered before would still hold a handle to the old sequencer
167        // but we would not read their commands anymore.
168        let (internal_cmd_tx, internal_cmd_rx) =
169            internal_control::setup_command_sequencer(timely_worker);
170
171        let storage_configuration =
172            StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs());
173
174        // We always initialize as read_only=true. Only when we're explicitly
175        // allowed do we switch to doing writes.
176        let (read_only_tx, read_only_rx) = watch::channel(true);
177
178        // Similar to the internal command sequencer, it is very important that
179        // we only create the async worker once because a) the worker state is
180        // re-used when a new client connects and b) commands that have already
181        // been sent and might yield a response will be lost if a new iteration
182        // of `run_client` creates a new async worker.
183        //
184        // If we created a new async worker every time we get a new client
185        // (likely because the controller re-started and re-connected), we can
186        // get into an inconsistent state where we think that a dataflow has
187        // been rendered, for example because there is an entry in
188        // `StorageState::ingestions`, while there is not yet a dataflow. This
189        // happens because the dataflow only gets rendered once we get a
190        // response from the async worker and send off an internal command.
191        //
192        // The core idea is that both the sequencer and the async worker are
193        // part of the per-worker state, and must be treated as such, meaning
194        // they must survive between invocations of `run_client`.
195
196        // TODO(aljoscha): This thread unparking business seems brittle, but that's
197        // also how the command channel works currently. We can wrap it inside a
198        // struct that holds both a channel and a `Thread`, but I don't
199        // think that would help too much.
200        let async_worker = async_storage_worker::AsyncStorageWorker::new(
201            thread::current(),
202            Arc::clone(&persist_clients),
203        );
204        let cluster_memory_limit = instance_context.cluster_memory_limit;
205
206        let storage_state = StorageState {
207            source_uppers: BTreeMap::new(),
208            source_tokens: BTreeMap::new(),
209            metrics,
210            reported_frontiers: BTreeMap::new(),
211            ingestions: BTreeMap::new(),
212            exports: BTreeMap::new(),
213            oneshot_ingestions: BTreeMap::new(),
214            now,
215            timely_worker_index: timely_worker.index(),
216            timely_worker_peers: timely_worker.peers(),
217            instance_context,
218            persist_clients,
219            txns_ctx,
220            sink_tokens: BTreeMap::new(),
221            sink_write_frontiers: BTreeMap::new(),
222            dropped_ids: Vec::new(),
223            aggregated_statistics: AggregatedStatistics::new(
224                timely_worker.index(),
225                timely_worker.peers(),
226            ),
227            shared_status_updates: Default::default(),
228            latest_status_updates: Default::default(),
229            initial_status_reported: Default::default(),
230            internal_cmd_tx,
231            internal_cmd_rx,
232            read_only_tx,
233            read_only_rx,
234            async_worker,
235            storage_configuration,
236            dataflow_parameters: DataflowParameters::new(
237                shared_rocksdb_write_buffer_manager,
238                cluster_memory_limit,
239            ),
240            tracing_handle,
241            server_maintenance_interval: Duration::ZERO,
242        };
243
244        // TODO(aljoscha): We might want `async_worker` and `internal_cmd_tx` to
245        // be fields of `Worker` instead of `StorageState`, but at least for the
246        // command flow sources and sinks need access to that. We can refactor
247        // this once we have a clearer boundary between what sources/sinks need
248        // and the full "power" of the internal command flow, which should stay
249        // internal to the worker/not be exposed to source/sink implementations.
250        Self {
251            timely_worker,
252            client_rx,
253            storage_state,
254        }
255    }
256}
257
258/// Worker-local state related to the ingress or egress of collections of data.
259pub struct StorageState {
260    /// The highest observed upper frontier for collection.
261    ///
262    /// This is shared among all source instances, so that they can jointly advance the
263    /// frontier even as other instances are created and dropped. Ideally, the Storage
264    /// module would eventually provide one source of truth on this rather than multiple,
265    /// and we should aim for that but are not there yet.
266    pub source_uppers: BTreeMap<GlobalId, Rc<RefCell<Antichain<mz_repr::Timestamp>>>>,
267    /// Handles to created sources, keyed by ID
268    /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
269    /// to prevent usage of custom shutdown tokens that are tricky to get right.
270    pub source_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
271    /// Metrics for storage objects.
272    pub metrics: StorageMetrics,
273    /// Tracks the conditional write frontiers we have reported.
274    pub reported_frontiers: BTreeMap<GlobalId, Antichain<Timestamp>>,
275    /// Descriptions of each installed ingestion.
276    pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
277    /// Descriptions of each installed export.
278    pub exports: BTreeMap<GlobalId, StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>,
279    /// Descriptions of oneshot ingestions that are currently running.
280    pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
281    /// Undocumented
282    pub now: NowFn,
283    /// Index of the associated timely dataflow worker.
284    pub timely_worker_index: usize,
285    /// Peers in the associated timely dataflow worker.
286    pub timely_worker_peers: usize,
287    /// Other configuration for sources and sinks.
288    pub instance_context: StorageInstanceContext,
289    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
290    /// This is intentionally shared between workers
291    pub persist_clients: Arc<PersistClientCache>,
292    /// Context necessary for rendering txn-wal operators.
293    pub txns_ctx: TxnsContext,
294    /// Tokens that should be dropped when a dataflow is dropped to clean up
295    /// associated state.
296    /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
297    /// to prevent usage of custom shutdown tokens that are tricky to get right.
298    pub sink_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
299    /// Frontier of sink writes (all subsequent writes will be at times at or
300    /// equal to this frontier)
301    pub sink_write_frontiers: BTreeMap<GlobalId, Rc<RefCell<Antichain<Timestamp>>>>,
302    /// Collection ids that have been dropped but not yet reported as dropped
303    pub dropped_ids: Vec<GlobalId>,
304
305    /// Statistics for sources and sinks.
306    pub aggregated_statistics: AggregatedStatistics,
307
308    /// A place shared with running dataflows, so that health operators, can
309    /// report status updates back to us.
310    ///
311    /// **NOTE**: Operators that append to this collection should take care to only add new
312    /// status updates if the status of the ingestion/export in question has _changed_.
313    pub shared_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,
314
315    /// The latest status update for each object.
316    pub latest_status_updates: BTreeMap<GlobalId, StatusUpdate>,
317
318    /// Whether we have reported the initial status after connecting to a new client.
319    /// This is reset to false when a new client connects.
320    pub initial_status_reported: bool,
321
322    /// Sender for cluster-internal storage commands. These can be sent from
323    /// within workers/operators and will be distributed to all workers. For
324    /// example, for shutting down an entire dataflow from within a
325    /// operator/worker.
326    pub internal_cmd_tx: InternalCommandSender,
327    /// Receiver for cluster-internal storage commands.
328    pub internal_cmd_rx: InternalCommandReceiver,
329
330    /// When this replica/cluster is in read-only mode it must not affect any
331    /// changes to external state. This flag can only be changed by a
332    /// [StorageCommand::AllowWrites].
333    ///
334    /// Everything running on this replica/cluster must obey this flag. At the
335    /// time of writing, nothing currently looks at this flag.
336    /// TODO(benesch): fix this.
337    ///
338    /// NOTE: In the future, we might want a more complicated flag, for example
339    /// something that tells us after which timestamp we are allowed to write.
340    /// In this first version we are keeping things as simple as possible!
341    pub read_only_rx: watch::Receiver<bool>,
342
343    /// Send-side for read-only state.
344    pub read_only_tx: watch::Sender<bool>,
345
346    /// Async worker companion, used for running code that requires async, which
347    /// the timely main loop cannot do.
348    pub async_worker: AsyncStorageWorker<mz_repr::Timestamp>,
349
350    /// Configuration for source and sink connections.
351    pub storage_configuration: StorageConfiguration,
352    /// Dynamically configurable parameters that control how dataflows are rendered.
353    /// NOTE(guswynn): we should consider moving these into `storage_configuration`.
354    pub dataflow_parameters: DataflowParameters,
355
356    /// A process-global handle to tracing configuration.
357    pub tracing_handle: Arc<TracingHandle>,
358
359    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
360    /// perform maintenance with every `step_or_park` invocation.
361    pub server_maintenance_interval: Duration,
362}
363
364impl StorageState {
365    /// Return an error handler that triggers a suspend and restart of the corresponding storage
366    /// dataflow.
367    pub fn error_handler(&self, context: &'static str, id: GlobalId) -> ErrorHandler {
368        let tx = self.internal_cmd_tx.clone();
369        ErrorHandler::signal(move |e| {
370            tx.send(InternalStorageCommand::SuspendAndRestart {
371                id,
372                reason: format!("{context}: {e:#}"),
373            })
374        })
375    }
376}
377
378/// Extra context for a storage instance.
379/// This is extra information that is used when rendering source
380/// and sinks that is not tied to the source/connection configuration itself.
381#[derive(Clone)]
382pub struct StorageInstanceContext {
383    /// A directory that can be used for scratch work.
384    pub scratch_directory: Option<PathBuf>,
385    /// A global `rocksdb::Env`, shared across ALL instances of `RocksDB` (even
386    /// across sources!). This `Env` lets us control some resources (like background threads)
387    /// process-wide.
388    pub rocksdb_env: rocksdb::Env,
389    /// The memory limit of the materialize cluster replica. This will
390    /// be used to calculate and configure the maximum inflight bytes for backpressure
391    pub cluster_memory_limit: Option<usize>,
392}
393
394impl StorageInstanceContext {
395    /// Build a new `StorageInstanceContext`.
396    pub fn new(
397        scratch_directory: Option<PathBuf>,
398        cluster_memory_limit: Option<usize>,
399    ) -> Result<Self, anyhow::Error> {
400        // If no file system is available, fall back to running RocksDB in memory.
401        let rocksdb_env = if scratch_directory.is_some() {
402            rocksdb::Env::new()?
403        } else {
404            rocksdb::Env::mem_env()?
405        };
406
407        Ok(Self {
408            scratch_directory,
409            rocksdb_env,
410            cluster_memory_limit,
411        })
412    }
413}
414
415impl<'w> Worker<'w> {
416    /// Waits for client connections and runs them to completion.
417    pub fn run(&mut self) {
418        while let Some((_nonce, rx, tx)) = self.client_rx.blocking_recv() {
419            self.run_client(rx, tx);
420        }
421    }
422
423    /// Runs this (timely) storage worker until the given `command_rx` is
424    /// disconnected.
425    ///
426    /// See the [module documentation](crate::storage_state) for this
427    /// workers responsibilities, how it communicates with the other workers and
428    /// how commands flow from the controller and through the workers.
429    fn run_client(&mut self, mut command_rx: CommandReceiver, response_tx: ResponseSender) {
430        // At this point, all workers are still reading from the command flow.
431        if self.reconcile(&mut command_rx).is_err() {
432            return;
433        }
434
435        // The last time we reported statistics.
436        let mut last_stats_time = Instant::now();
437
438        // The last time we did periodic maintenance.
439        let mut last_maintenance = std::time::Instant::now();
440
441        let mut disconnected = false;
442        while !disconnected {
443            let config = &self.storage_state.storage_configuration;
444            let stats_interval = config.parameters.statistics_collection_interval;
445
446            let maintenance_interval = self.storage_state.server_maintenance_interval;
447
448            let now = std::time::Instant::now();
449            // Determine if we need to perform maintenance, which is true if `maintenance_interval`
450            // time has passed since the last maintenance.
451            let sleep_duration;
452            if now >= last_maintenance + maintenance_interval {
453                last_maintenance = now;
454                sleep_duration = None;
455
456                self.report_frontier_progress(&response_tx);
457            } else {
458                // We didn't perform maintenance, sleep until the next maintenance interval.
459                let next_maintenance = last_maintenance + maintenance_interval;
460                sleep_duration = Some(next_maintenance.saturating_duration_since(now))
461            }
462
463            // Ask Timely to execute a unit of work.
464            //
465            // If there are no pending commands or responses from the async
466            // worker, we ask Timely to park the thread if there's nothing to
467            // do. We rely on another thread unparking us when there's new work
468            // to be done, e.g., when sending a command or when new Kafka
469            // messages have arrived.
470            //
471            // It is critical that we allow Timely to park iff there are no
472            // pending commands or responses. The command may have already been
473            // consumed by the call to `client_rx.recv`. See:
474            // https://github.com/MaterializeInc/materialize/pull/13973#issuecomment-1200312212
475            if command_rx.is_empty() && self.storage_state.async_worker.is_empty() {
476                // Make sure we wake up again to report any pending statistics updates.
477                let mut park_duration = stats_interval.saturating_sub(last_stats_time.elapsed());
478                if let Some(sleep_duration) = sleep_duration {
479                    park_duration = std::cmp::min(sleep_duration, park_duration);
480                }
481                self.timely_worker.step_or_park(Some(park_duration));
482            } else {
483                self.timely_worker.step();
484            }
485
486            // Rerport any dropped ids
487            for id in std::mem::take(&mut self.storage_state.dropped_ids) {
488                self.send_storage_response(&response_tx, StorageResponse::DroppedId(id));
489            }
490
491            self.process_oneshot_ingestions(&response_tx);
492
493            self.report_status_updates(&response_tx);
494
495            if last_stats_time.elapsed() >= stats_interval {
496                self.report_storage_statistics(&response_tx);
497                last_stats_time = Instant::now();
498            }
499
500            // Handle any received commands.
501            loop {
502                match command_rx.try_recv() {
503                    Ok(cmd) => self.storage_state.handle_storage_command(cmd),
504                    Err(TryRecvError::Empty) => break,
505                    Err(TryRecvError::Disconnected) => {
506                        disconnected = true;
507                        break;
508                    }
509                }
510            }
511
512            // Handle responses from the async worker.
513            while let Ok(response) = self.storage_state.async_worker.try_recv() {
514                self.handle_async_worker_response(response);
515            }
516
517            // Handle any received commands.
518            while let Some(command) = self.storage_state.internal_cmd_rx.try_recv() {
519                self.handle_internal_storage_command(command);
520            }
521        }
522    }
523
524    /// Entry point for applying a response from the async storage worker.
525    pub fn handle_async_worker_response(
526        &self,
527        async_response: AsyncStorageWorkerResponse<mz_repr::Timestamp>,
528    ) {
529        // NOTE: If we want to share the load of async processing we
530        // have to change `handle_storage_command` and change this
531        // assert.
532        assert_eq!(
533            self.timely_worker.index(),
534            0,
535            "only worker #0 is doing async processing"
536        );
537        match async_response {
538            AsyncStorageWorkerResponse::IngestionFrontiersUpdated {
539                id,
540                ingestion_description,
541                as_of,
542                resume_uppers,
543                source_resume_uppers,
544            } => {
545                self.storage_state.internal_cmd_tx.send(
546                    InternalStorageCommand::CreateIngestionDataflow {
547                        id,
548                        ingestion_description,
549                        as_of,
550                        resume_uppers,
551                        source_resume_uppers,
552                    },
553                );
554            }
555            AsyncStorageWorkerResponse::ExportFrontiersUpdated { id, description } => {
556                self.storage_state
557                    .internal_cmd_tx
558                    .send(InternalStorageCommand::RunSinkDataflow(id, description));
559            }
560            AsyncStorageWorkerResponse::DropDataflow(id) => {
561                self.storage_state
562                    .internal_cmd_tx
563                    .send(InternalStorageCommand::DropDataflow(vec![id]));
564            }
565        }
566    }
567
568    /// Entry point for applying an internal storage command.
569    pub fn handle_internal_storage_command(&mut self, internal_cmd: InternalStorageCommand) {
570        match internal_cmd {
571            InternalStorageCommand::SuspendAndRestart { id, reason } => {
572                info!(
573                    "worker {}/{} initiating suspend-and-restart for {id} because of: {reason}",
574                    self.timely_worker.index(),
575                    self.timely_worker.peers(),
576                );
577
578                let maybe_ingestion = self.storage_state.ingestions.get(&id).cloned();
579                if let Some(ingestion_description) = maybe_ingestion {
580                    // Yank the token of the previously existing source dataflow.Note that this
581                    // token also includes any source exports/subsources.
582                    let maybe_token = self.storage_state.source_tokens.remove(&id);
583                    if maybe_token.is_none() {
584                        // Something has dropped the source. Make sure we don't
585                        // accidentally re-create it.
586                        return;
587                    }
588
589                    // This needs to be done by one worker, which will
590                    // broadcasts a `CreateIngestionDataflow` command to all
591                    // workers based on the response that contains the
592                    // resumption upper.
593                    //
594                    // Doing this separately on each worker could lead to
595                    // differing resume_uppers which might lead to all kinds of
596                    // mayhem.
597                    //
598                    // TODO(aljoscha): If we ever become worried that this is
599                    // putting undue pressure on worker 0 we can pick the
600                    // designated worker for a source/sink based on `id.hash()`.
601                    if self.timely_worker.index() == 0 {
602                        for (id, _) in ingestion_description.source_exports.iter() {
603                            self.storage_state
604                                .aggregated_statistics
605                                .advance_global_epoch(*id);
606                        }
607                        self.storage_state
608                            .async_worker
609                            .update_ingestion_frontiers(id, ingestion_description);
610                    }
611
612                    // Continue with other commands.
613                    return;
614                }
615
616                let maybe_sink = self.storage_state.exports.get(&id).cloned();
617                if let Some(sink_description) = maybe_sink {
618                    // Yank the token of the previously existing sink
619                    // dataflow.
620                    let maybe_token = self.storage_state.sink_tokens.remove(&id);
621
622                    if maybe_token.is_none() {
623                        // Something has dropped the sink. Make sure we don't
624                        // accidentally re-create it.
625                        return;
626                    }
627
628                    // This needs to be broadcast by one worker and go through
629                    // the internal command fabric, to ensure consistent
630                    // ordering of dataflow rendering across all workers.
631                    if self.timely_worker.index() == 0 {
632                        self.storage_state
633                            .aggregated_statistics
634                            .advance_global_epoch(id);
635                        self.storage_state
636                            .async_worker
637                            .update_sink_frontiers(id, sink_description);
638                    }
639
640                    // Continue with other commands.
641                    return;
642                }
643
644                if !self
645                    .storage_state
646                    .ingestions
647                    .values()
648                    .any(|v| v.source_exports.contains_key(&id))
649                {
650                    // Our current approach to dropping a source results in a race between shard
651                    // finalization (which happens in the controller) and dataflow shutdown (which
652                    // happens in clusterd). If a source is created and dropped fast enough -or the
653                    // two commands get sufficiently delayed- then it's possible to receive a
654                    // SuspendAndRestart command for an unknown source. We cannot assert that this
655                    // never happens but we log an error here to track how often this happens.
656                    warn!(
657                        "got InternalStorageCommand::SuspendAndRestart for something that is not a source or sink: {id}"
658                    );
659                }
660            }
661            InternalStorageCommand::CreateIngestionDataflow {
662                id: ingestion_id,
663                mut ingestion_description,
664                as_of,
665                mut resume_uppers,
666                mut source_resume_uppers,
667            } => {
668                info!(
669                    ?as_of,
670                    ?resume_uppers,
671                    "worker {}/{} trying to (re-)start ingestion {ingestion_id}",
672                    self.timely_worker.index(),
673                    self.timely_worker.peers(),
674                );
675
676                // We initialize statistics before we prune finished exports. We
677                // still want to export statistics for these, plus the rendering
678                // machinery will get confused if there are not at least
679                // statistics for the "main" source.
680                for (export_id, export) in ingestion_description.source_exports.iter() {
681                    let resume_upper = resume_uppers[export_id].clone();
682                    self.storage_state.aggregated_statistics.initialize_source(
683                        *export_id,
684                        ingestion_id,
685                        resume_upper.clone(),
686                        || {
687                            SourceStatistics::new(
688                                *export_id,
689                                self.storage_state.timely_worker_index,
690                                &self.storage_state.metrics.source_statistics,
691                                ingestion_id,
692                                &export.storage_metadata.data_shard,
693                                export.data_config.envelope.clone(),
694                                resume_upper,
695                            )
696                        },
697                    );
698                }
699
700                let finished_exports: BTreeSet<GlobalId> = resume_uppers
701                    .iter()
702                    .filter(|(_, frontier)| frontier.is_empty())
703                    .map(|(id, _)| *id)
704                    .collect();
705
706                resume_uppers.retain(|id, _| !finished_exports.contains(id));
707                source_resume_uppers.retain(|id, _| !finished_exports.contains(id));
708                ingestion_description
709                    .source_exports
710                    .retain(|id, _| !finished_exports.contains(id));
711
712                for id in ingestion_description.collection_ids() {
713                    // If there is already a shared upper, we re-use it, to make
714                    // sure that parties that are already using the shared upper
715                    // can continue doing so.
716                    let source_upper = self
717                        .storage_state
718                        .source_uppers
719                        .entry(id.clone())
720                        .or_insert_with(|| {
721                            Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())))
722                        });
723
724                    let mut source_upper = source_upper.borrow_mut();
725                    if !source_upper.is_empty() {
726                        source_upper.clear();
727                        source_upper.insert(mz_repr::Timestamp::minimum());
728                    }
729                }
730
731                // If all subsources of the source are finished, we can skip rendering entirely.
732                // Also, if `as_of` is empty, the dataflow has been finalized, so we can skip it as
733                // well.
734                //
735                // TODO(guswynn|petrosagg): this is a bit hacky, and is a consequence of storage state
736                // management being a bit of a mess. we should clean this up and remove weird if
737                // statements like this.
738                if resume_uppers.values().all(|frontier| frontier.is_empty()) || as_of.is_empty() {
739                    info!(
740                        ?resume_uppers,
741                        ?as_of,
742                        "worker {}/{} skipping building ingestion dataflow \
743                        for {ingestion_id} because the ingestion is finished",
744                        self.timely_worker.index(),
745                        self.timely_worker.peers(),
746                    );
747                    return;
748                }
749
750                crate::render::build_ingestion_dataflow(
751                    self.timely_worker,
752                    &mut self.storage_state,
753                    ingestion_id,
754                    ingestion_description,
755                    as_of,
756                    resume_uppers,
757                    source_resume_uppers,
758                );
759            }
760            InternalStorageCommand::RunOneshotIngestion {
761                ingestion_id,
762                collection_id,
763                collection_meta,
764                request,
765            } => {
766                crate::render::build_oneshot_ingestion_dataflow(
767                    self.timely_worker,
768                    &mut self.storage_state,
769                    ingestion_id,
770                    collection_id,
771                    collection_meta,
772                    request,
773                );
774            }
775            InternalStorageCommand::RunSinkDataflow(sink_id, sink_description) => {
776                info!(
777                    "worker {}/{} trying to (re-)start sink {sink_id}",
778                    self.timely_worker.index(),
779                    self.timely_worker.peers(),
780                );
781
782                {
783                    // If there is already a shared write frontier, we re-use it, to
784                    // make sure that parties that are already using the shared
785                    // frontier can continue doing so.
786                    let sink_write_frontier = self
787                        .storage_state
788                        .sink_write_frontiers
789                        .entry(sink_id.clone())
790                        .or_insert_with(|| Rc::new(RefCell::new(Antichain::new())));
791
792                    let mut sink_write_frontier = sink_write_frontier.borrow_mut();
793                    sink_write_frontier.clear();
794                    sink_write_frontier.insert(mz_repr::Timestamp::minimum());
795                }
796                self.storage_state
797                    .aggregated_statistics
798                    .initialize_sink(sink_id, || {
799                        SinkStatistics::new(
800                            sink_id,
801                            self.storage_state.timely_worker_index,
802                            &self.storage_state.metrics.sink_statistics,
803                        )
804                    });
805
806                crate::render::build_export_dataflow(
807                    self.timely_worker,
808                    &mut self.storage_state,
809                    sink_id,
810                    sink_description,
811                );
812            }
813            InternalStorageCommand::DropDataflow(ids) => {
814                for id in &ids {
815                    // Clean up per-source / per-sink state.
816                    self.storage_state.source_uppers.remove(id);
817                    self.storage_state.source_tokens.remove(id);
818
819                    self.storage_state.sink_tokens.remove(id);
820                    self.storage_state.sink_write_frontiers.remove(id);
821
822                    self.storage_state.aggregated_statistics.deinitialize(*id);
823                }
824            }
825            InternalStorageCommand::UpdateConfiguration { storage_parameters } => {
826                self.storage_state
827                    .dataflow_parameters
828                    .update(storage_parameters.clone());
829                self.storage_state
830                    .storage_configuration
831                    .update(storage_parameters);
832
833                // Clear out the updates as we no longer forward them to anyone else to process.
834                // We clone `StorageState::storage_configuration` many times during rendering
835                // and want to avoid cloning these unused updates.
836                self.storage_state
837                    .storage_configuration
838                    .parameters
839                    .dyncfg_updates = Default::default();
840
841                // Remember the maintenance interval locally to avoid reading it from the config set on
842                // every server iteration.
843                self.storage_state.server_maintenance_interval =
844                    STORAGE_SERVER_MAINTENANCE_INTERVAL
845                        .get(self.storage_state.storage_configuration.config_set());
846            }
847            InternalStorageCommand::StatisticsUpdate { sources, sinks } => self
848                .storage_state
849                .aggregated_statistics
850                .ingest(sources, sinks),
851        }
852    }
853
854    /// Emit information about write frontier progress, along with information that should
855    /// be made durable for this to be the case.
856    ///
857    /// The write frontier progress is "conditional" in that it is not until the information is made
858    /// durable that the data are emitted to downstream workers, and indeed they should not rely on
859    /// the completeness of what they hear until the information is made durable.
860    ///
861    /// Specifically, this sends information about new timestamp bindings created by dataflow workers,
862    /// with the understanding if that if made durable (and ack'd back to the workers) the source will
863    /// in fact progress with this write frontier.
864    pub fn report_frontier_progress(&mut self, response_tx: &ResponseSender) {
865        let mut new_uppers = Vec::new();
866
867        // Check if any observed frontier should advance the reported frontiers.
868        for (id, frontier) in self
869            .storage_state
870            .source_uppers
871            .iter()
872            .chain(self.storage_state.sink_write_frontiers.iter())
873        {
874            let Some(reported_frontier) = self.storage_state.reported_frontiers.get_mut(id) else {
875                // Frontier reporting has not yet been started for this object.
876                // Potentially because this timely worker has not yet seen the
877                // `CreateSources` command.
878                continue;
879            };
880
881            let observed_frontier = frontier.borrow();
882
883            // Only do a thing if it *advances* the frontier, not just *changes* the frontier.
884            // This is protection against `frontier` lagging behind what we have conditionally reported.
885            if PartialOrder::less_than(reported_frontier, &observed_frontier) {
886                new_uppers.push((*id, observed_frontier.clone()));
887                reported_frontier.clone_from(&observed_frontier);
888            }
889        }
890
891        for (id, upper) in new_uppers {
892            self.send_storage_response(response_tx, StorageResponse::FrontierUpper(id, upper));
893        }
894    }
895
896    /// Pumps latest status updates from the buffer shared with operators and
897    /// reports any updates that need reporting.
898    pub fn report_status_updates(&mut self, response_tx: &ResponseSender) {
899        // If we haven't done the initial status report, report all current statuses
900        if !self.storage_state.initial_status_reported {
901            // We pull initially reported status updates to "now", so that they
902            // sort as the latest update in internal status collections. This
903            // makes it so that a newly bootstrapped envd can append status
904            // updates to internal status collections that report an accurate
905            // view as of the time when they came up.
906            let now_ts = mz_ore::now::to_datetime((self.storage_state.now)());
907            let status_updates = self
908                .storage_state
909                .latest_status_updates
910                .values()
911                .cloned()
912                .map(|mut update| {
913                    update.timestamp = now_ts.clone();
914                    update
915                });
916            for update in status_updates {
917                self.send_storage_response(response_tx, StorageResponse::StatusUpdate(update));
918            }
919            self.storage_state.initial_status_reported = true;
920        }
921
922        // Pump updates into our state and stage them for reporting.
923        for shared_update in self.storage_state.shared_status_updates.take() {
924            self.send_storage_response(
925                response_tx,
926                StorageResponse::StatusUpdate(shared_update.clone()),
927            );
928
929            self.storage_state
930                .latest_status_updates
931                .insert(shared_update.id, shared_update);
932        }
933    }
934
935    /// Report source statistics back to the controller.
936    pub fn report_storage_statistics(&mut self, response_tx: &ResponseSender) {
937        let (sources, sinks) = self.storage_state.aggregated_statistics.emit_local();
938        if !sources.is_empty() || !sinks.is_empty() {
939            self.storage_state
940                .internal_cmd_tx
941                .send(InternalStorageCommand::StatisticsUpdate { sources, sinks })
942        }
943
944        let (sources, sinks) = self.storage_state.aggregated_statistics.snapshot();
945        if !sources.is_empty() || !sinks.is_empty() {
946            self.send_storage_response(
947                response_tx,
948                StorageResponse::StatisticsUpdates(sources, sinks),
949            );
950        }
951    }
952
953    /// Send a response to the coordinator.
954    fn send_storage_response(&self, response_tx: &ResponseSender, response: StorageResponse) {
955        // Ignore send errors because the coordinator is free to ignore our
956        // responses. This happens during shutdown.
957        let _ = response_tx.send(response);
958    }
959
960    fn process_oneshot_ingestions(&mut self, response_tx: &ResponseSender) {
961        for (ingestion_id, ingestion_state) in &mut self.storage_state.oneshot_ingestions {
962            loop {
963                match ingestion_state.results.try_recv() {
964                    Ok(result) => {
965                        let response = match result {
966                            Ok(maybe_batch) => maybe_batch.into_iter().map(Result::Ok).collect(),
967                            Err(err) => vec![Err(err)],
968                        };
969                        let staged_batches = BTreeMap::from([(*ingestion_id, response)]);
970                        let _ = response_tx.send(StorageResponse::StagedBatches(staged_batches));
971                    }
972                    Err(TryRecvError::Empty) => {
973                        break;
974                    }
975                    Err(TryRecvError::Disconnected) => {
976                        break;
977                    }
978                }
979            }
980        }
981    }
982
983    /// Extract commands until `InitializationComplete`, and make the worker
984    /// reflect those commands. If the worker can not be made to reflect the
985    /// commands, return an error.
986    fn reconcile(&mut self, command_rx: &mut CommandReceiver) -> Result<(), ()> {
987        let worker_id = self.timely_worker.index();
988
989        // To initialize the connection, we want to drain all commands until we
990        // receive a `StorageCommand::InitializationComplete` command to form a
991        // target command state.
992        let mut commands = vec![];
993        loop {
994            match command_rx.blocking_recv().ok_or(())? {
995                StorageCommand::InitializationComplete => break,
996                command => commands.push(command),
997            }
998        }
999
1000        // Track which frontiers this envd expects; we will also set their
1001        // initial timestamp to the minimum timestamp to reset them as we don't
1002        // know what frontiers the new envd expects.
1003        let mut expected_objects = BTreeSet::new();
1004
1005        let mut drop_commands = BTreeSet::new();
1006        let mut running_ingestion_descriptions = self.storage_state.ingestions.clone();
1007        let mut running_exports_descriptions = self.storage_state.exports.clone();
1008
1009        let mut create_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1010        let mut cancel_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1011
1012        for command in &mut commands {
1013            match command {
1014                StorageCommand::Hello { .. } => {
1015                    panic!("Hello must be captured before")
1016                }
1017                StorageCommand::AllowCompaction(id, since) => {
1018                    info!(%worker_id, ?id, ?since, "reconcile: received AllowCompaction command");
1019
1020                    // collect all "drop commands". These are `AllowCompaction`
1021                    // commands that compact to the empty since. Then, later, we make sure
1022                    // we retain only those `Create*` commands that are not dropped. We
1023                    // assume that the `AllowCompaction` command is ordered after the
1024                    // `Create*` commands but don't assert that.
1025                    // WIP: Should we assert?
1026                    if since.is_empty() {
1027                        drop_commands.insert(*id);
1028                    }
1029                }
1030                StorageCommand::RunIngestion(ingestion) => {
1031                    info!(%worker_id, ?ingestion, "reconcile: received RunIngestion command");
1032
1033                    // Ensure that ingestions are forward-rolling alter compatible.
1034                    let prev = running_ingestion_descriptions
1035                        .insert(ingestion.id, ingestion.description.clone());
1036
1037                    if let Some(prev_ingest) = prev {
1038                        // If the new ingestion is not exactly equal to the currently running
1039                        // ingestion, we must either track that we need to synthesize an update
1040                        // command to change the ingestion, or panic.
1041                        prev_ingest
1042                            .alter_compatible(ingestion.id, &ingestion.description)
1043                            .expect("only alter compatible ingestions permitted");
1044                    }
1045                }
1046                StorageCommand::RunSink(export) => {
1047                    info!(%worker_id, ?export, "reconcile: received RunSink command");
1048
1049                    // Ensure that exports are forward-rolling alter compatible.
1050                    let prev =
1051                        running_exports_descriptions.insert(export.id, export.description.clone());
1052
1053                    if let Some(prev_export) = prev {
1054                        prev_export
1055                            .alter_compatible(export.id, &export.description)
1056                            .expect("only alter compatible exports permitted");
1057                    }
1058                }
1059                StorageCommand::RunOneshotIngestion(ingestion) => {
1060                    info!(%worker_id, ?ingestion, "reconcile: received RunOneshotIngestion command");
1061                    create_oneshot_ingestions.insert(ingestion.ingestion_id);
1062                }
1063                StorageCommand::CancelOneshotIngestion(uuid) => {
1064                    info!(%worker_id, %uuid, "reconcile: received CancelOneshotIngestion command");
1065                    cancel_oneshot_ingestions.insert(*uuid);
1066                }
1067                StorageCommand::InitializationComplete
1068                | StorageCommand::AllowWrites
1069                | StorageCommand::UpdateConfiguration(_) => (),
1070            }
1071        }
1072
1073        let mut seen_most_recent_definition = BTreeSet::new();
1074
1075        // We iterate over this backward to ensure that we keep only the most recent ingestion
1076        // description.
1077        let mut filtered_commands = VecDeque::new();
1078        for mut command in commands.into_iter().rev() {
1079            let mut should_keep = true;
1080            match &mut command {
1081                StorageCommand::Hello { .. } => {
1082                    panic!("Hello must be captured before")
1083                }
1084                StorageCommand::RunIngestion(ingestion) => {
1085                    // Subsources can be dropped independently of their
1086                    // primary source, so we evaluate them in a separate
1087                    // loop.
1088                    for export_id in ingestion
1089                        .description
1090                        .source_exports
1091                        .keys()
1092                        .filter(|export_id| **export_id != ingestion.id)
1093                    {
1094                        if drop_commands.remove(export_id) {
1095                            info!(%worker_id, %export_id, "reconcile: dropping subsource");
1096                            self.storage_state.dropped_ids.push(*export_id);
1097                        }
1098                    }
1099
1100                    if drop_commands.remove(&ingestion.id)
1101                        || self.storage_state.dropped_ids.contains(&ingestion.id)
1102                    {
1103                        info!(%worker_id, %ingestion.id, "reconcile: dropping ingestion");
1104
1105                        // If an ingestion is dropped, so too must all of
1106                        // its subsources (i.e. ingestion exports, as well
1107                        // as its progress subsource).
1108                        for id in ingestion.description.collection_ids() {
1109                            drop_commands.remove(&id);
1110                            self.storage_state.dropped_ids.push(id);
1111                        }
1112                        should_keep = false;
1113                    } else {
1114                        let most_recent_defintion =
1115                            seen_most_recent_definition.insert(ingestion.id);
1116
1117                        if most_recent_defintion {
1118                            // If this is the most recent definition, this
1119                            // is what we will be running when
1120                            // reconciliation completes. This definition
1121                            // must not include any dropped subsources.
1122                            ingestion.description.source_exports.retain(|export_id, _| {
1123                                !self.storage_state.dropped_ids.contains(export_id)
1124                            });
1125
1126                            // After clearing any dropped subsources, we can
1127                            // state that we expect all of these to exist.
1128                            expected_objects.extend(ingestion.description.collection_ids());
1129                        }
1130
1131                        let running_ingestion = self.storage_state.ingestions.get(&ingestion.id);
1132
1133                        // We keep only:
1134                        // - The most recent version of the ingestion, which
1135                        //   is why these commands are run in reverse.
1136                        // - Ingestions whose descriptions are not exactly
1137                        //   those that are currently running.
1138                        should_keep = most_recent_defintion
1139                            && running_ingestion != Some(&ingestion.description)
1140                    }
1141                }
1142                StorageCommand::RunSink(export) => {
1143                    if drop_commands.remove(&export.id)
1144                        // If there were multiple `RunSink` in the command
1145                        // stream, we want to ensure none of them are
1146                        // retained.
1147                        || self.storage_state.dropped_ids.contains(&export.id)
1148                    {
1149                        info!(%worker_id, %export.id, "reconcile: dropping sink");
1150
1151                        // Make sure that we report back that the ID was
1152                        // dropped.
1153                        self.storage_state.dropped_ids.push(export.id);
1154
1155                        should_keep = false
1156                    } else {
1157                        expected_objects.insert(export.id);
1158
1159                        let running_sink = self.storage_state.exports.get(&export.id);
1160
1161                        // We keep only:
1162                        // - The most recent version of the sink, which
1163                        //   is why these commands are run in reverse.
1164                        // - Sinks whose descriptions are not exactly
1165                        //   those that are currently running.
1166                        should_keep = seen_most_recent_definition.insert(export.id)
1167                            && running_sink != Some(&export.description);
1168                    }
1169                }
1170                StorageCommand::RunOneshotIngestion(ingestion) => {
1171                    let already_running = self
1172                        .storage_state
1173                        .oneshot_ingestions
1174                        .contains_key(&ingestion.ingestion_id);
1175                    let was_canceled = cancel_oneshot_ingestions.contains(&ingestion.ingestion_id);
1176
1177                    should_keep = !already_running && !was_canceled;
1178                }
1179                StorageCommand::CancelOneshotIngestion(ingestion_id) => {
1180                    let already_running = self
1181                        .storage_state
1182                        .oneshot_ingestions
1183                        .contains_key(ingestion_id);
1184                    should_keep = already_running;
1185                }
1186                StorageCommand::InitializationComplete
1187                | StorageCommand::AllowWrites
1188                | StorageCommand::UpdateConfiguration(_)
1189                | StorageCommand::AllowCompaction(_, _) => (),
1190            }
1191            if should_keep {
1192                filtered_commands.push_front(command);
1193            }
1194        }
1195        let commands = filtered_commands;
1196
1197        // Make sure all the "drop commands" matched up with a source or sink.
1198        // This is also what the regular handler logic for `AllowCompaction`
1199        // would do.
1200        soft_assert_or_log!(
1201            drop_commands.is_empty(),
1202            "AllowCompaction commands for non-existent IDs {:?}",
1203            drop_commands
1204        );
1205
1206        // Determine the ID of all objects we did _not_ see; these are
1207        // considered stale.
1208        let stale_objects = self
1209            .storage_state
1210            .ingestions
1211            .values()
1212            .map(|i| i.collection_ids())
1213            .flatten()
1214            .chain(self.storage_state.exports.keys().copied())
1215            // Objects are considered stale if we did not see them re-created.
1216            .filter(|id| !expected_objects.contains(id))
1217            .collect::<Vec<_>>();
1218        let stale_oneshot_ingestions = self
1219            .storage_state
1220            .oneshot_ingestions
1221            .keys()
1222            .filter(|ingestion_id| {
1223                let to_create = create_oneshot_ingestions.contains(ingestion_id);
1224                let to_drop = cancel_oneshot_ingestions.contains(ingestion_id);
1225                mz_ore::soft_assert_or_log!(
1226                    !(!to_create && to_drop),
1227                    "attempting to drop oneshot source {ingestion_id} that is not expected to be created during reconciliation"
1228                );
1229                !to_create && !to_drop
1230            })
1231            .copied()
1232            .collect::<Vec<_>>();
1233
1234        info!(
1235            %worker_id, ?expected_objects, ?stale_objects, ?stale_oneshot_ingestions,
1236            "reconcile: modifing storage state to match expected objects",
1237        );
1238
1239        for id in stale_objects {
1240            self.storage_state.drop_collection(id);
1241        }
1242        for id in stale_oneshot_ingestions {
1243            self.storage_state.drop_oneshot_ingestion(id);
1244        }
1245
1246        // Do not report dropping any objects that do not belong to expected
1247        // objects.
1248        self.storage_state
1249            .dropped_ids
1250            .retain(|id| expected_objects.contains(id));
1251
1252        // Do not report any frontiers that do not belong to expected objects.
1253        // Note that this set of objects can differ from the set of sources and
1254        // sinks.
1255        self.storage_state
1256            .reported_frontiers
1257            .retain(|id, _| expected_objects.contains(id));
1258
1259        // Reset the reported frontiers for the remaining objects.
1260        for (_, frontier) in &mut self.storage_state.reported_frontiers {
1261            *frontier = Antichain::from_elem(<_>::minimum());
1262        }
1263
1264        // Reset the initial status reported flag when a new client connects
1265        self.storage_state.initial_status_reported = false;
1266
1267        // Execute the modified commands.
1268        for command in commands {
1269            self.storage_state.handle_storage_command(command);
1270        }
1271
1272        Ok(())
1273    }
1274}
1275
1276impl StorageState {
1277    /// Entry point for applying a storage command.
1278    ///
1279    /// NOTE: This does not have access to the timely worker and therefore
1280    /// cannot render dataflows. For dataflow rendering, this needs to either
1281    /// send asynchronous command to the `async_worker` or internal
1282    /// commands to the `internal_cmd_tx`.
1283    pub fn handle_storage_command(&mut self, cmd: StorageCommand) {
1284        match cmd {
1285            StorageCommand::Hello { .. } => panic!("Hello must be captured before"),
1286            StorageCommand::InitializationComplete => (),
1287            StorageCommand::AllowWrites => {
1288                self.read_only_tx
1289                    .send(false)
1290                    .expect("we're holding one other end");
1291                self.persist_clients.cfg().enable_compaction();
1292            }
1293            StorageCommand::UpdateConfiguration(params) => {
1294                // These can be done from all workers safely.
1295                debug!("Applying configuration update: {params:?}");
1296
1297                // We serialize the dyncfg updates in StorageParameters, but configure
1298                // persist separately.
1299                self.persist_clients
1300                    .cfg()
1301                    .apply_from(&params.dyncfg_updates);
1302
1303                params.tracing.apply(self.tracing_handle.as_ref());
1304
1305                if let Some(log_filter) = &params.tracing.log_filter {
1306                    self.storage_configuration
1307                        .connection_context
1308                        .librdkafka_log_level =
1309                        mz_ore::tracing::crate_level(&log_filter.clone().into(), "librdkafka");
1310                }
1311
1312                // This needs to be broadcast by one worker and go through
1313                // the internal command fabric, to ensure consistent
1314                // ordering of dataflow rendering across all workers.
1315                if self.timely_worker_index == 0 {
1316                    self.internal_cmd_tx
1317                        .send(InternalStorageCommand::UpdateConfiguration {
1318                            storage_parameters: *params,
1319                        })
1320                }
1321            }
1322            StorageCommand::RunIngestion(ingestion) => {
1323                let RunIngestionCommand { id, description } = *ingestion;
1324
1325                // Remember the ingestion description to facilitate possible
1326                // reconciliation later.
1327                self.ingestions.insert(id, description.clone());
1328
1329                // Initialize shared frontier reporting.
1330                for id in description.collection_ids() {
1331                    self.reported_frontiers
1332                        .entry(id)
1333                        .or_insert_with(|| Antichain::from_elem(mz_repr::Timestamp::minimum()));
1334                }
1335
1336                // This needs to be done by one worker, which will broadcasts a
1337                // `CreateIngestionDataflow` command to all workers based on the response that
1338                // contains the resumption upper.
1339                //
1340                // Doing this separately on each worker could lead to differing resume_uppers
1341                // which might lead to all kinds of mayhem.
1342                //
1343                // n.b. the ingestion on each worker uses the description from worker 0––not the
1344                // ingestion in the local storage state. This is something we might have
1345                // interest in fixing in the future, e.g. materialize#19907
1346                if self.timely_worker_index == 0 {
1347                    self.async_worker
1348                        .update_ingestion_frontiers(id, description);
1349                }
1350            }
1351            StorageCommand::RunOneshotIngestion(oneshot) => {
1352                if self.timely_worker_index == 0 {
1353                    self.internal_cmd_tx
1354                        .send(InternalStorageCommand::RunOneshotIngestion {
1355                            ingestion_id: oneshot.ingestion_id,
1356                            collection_id: oneshot.collection_id,
1357                            collection_meta: oneshot.collection_meta,
1358                            request: oneshot.request,
1359                        });
1360                }
1361            }
1362            StorageCommand::CancelOneshotIngestion(id) => {
1363                self.drop_oneshot_ingestion(id);
1364            }
1365            StorageCommand::RunSink(export) => {
1366                // Remember the sink description to facilitate possible
1367                // reconciliation later.
1368                let prev = self.exports.insert(export.id, export.description.clone());
1369
1370                // New sink, add state.
1371                if prev.is_none() {
1372                    self.reported_frontiers.insert(
1373                        export.id,
1374                        Antichain::from_elem(mz_repr::Timestamp::minimum()),
1375                    );
1376                }
1377
1378                // This needs to be broadcast by one worker and go through the internal command
1379                // fabric, to ensure consistent ordering of dataflow rendering across all
1380                // workers.
1381                if self.timely_worker_index == 0 {
1382                    self.internal_cmd_tx
1383                        .send(InternalStorageCommand::RunSinkDataflow(
1384                            export.id,
1385                            export.description,
1386                        ));
1387                }
1388            }
1389            StorageCommand::AllowCompaction(id, frontier) => {
1390                soft_assert_or_log!(
1391                    self.exports.contains_key(&id) || self.reported_frontiers.contains_key(&id),
1392                    "AllowCompaction command for non-existent {id}"
1393                );
1394
1395                if frontier.is_empty() {
1396                    // Indicates that we may drop `id`, as there are no more valid times to read.
1397                    self.drop_collection(id);
1398                }
1399            }
1400        }
1401    }
1402
1403    /// Drop the identified storage collection from the storage state.
1404    fn drop_collection(&mut self, id: GlobalId) {
1405        fail_point!("crash_on_drop");
1406
1407        self.ingestions.remove(&id);
1408        self.exports.remove(&id);
1409
1410        let _ = self.latest_status_updates.remove(&id);
1411
1412        // This will stop reporting of frontiers.
1413        //
1414        // If this object still has its frontiers reported, we will notify the
1415        // client envd of the drop.
1416        if self.reported_frontiers.remove(&id).is_some() {
1417            // The only actions left are internal cleanup, so we can commit to
1418            // the client that these objects have been dropped.
1419            //
1420            // This must be done now rather than in response to `DropDataflow`,
1421            // otherwise we introduce the possibility of a timing issue where:
1422            // - We remove all tracking state from the storage state and send
1423            //   `DropDataflow` (i.e. this block).
1424            // - While waiting to process that command, we reconcile with a new
1425            //   envd. That envd has already committed to its catalog that this
1426            //   object no longer exists.
1427            // - We process the `DropDataflow` command, and identify that this
1428            //   object has been dropped.
1429            // - The next time `dropped_ids` is processed, we send a response
1430            //   that this ID has been dropped, but the upstream state has no
1431            //   record of that object having ever existed.
1432            self.dropped_ids.push(id);
1433        }
1434
1435        // Send through async worker for correct ordering with RunIngestion, and
1436        // dropping the dataflow is done on async worker response.
1437        if self.timely_worker_index == 0 {
1438            self.async_worker.drop_dataflow(id);
1439        }
1440    }
1441
1442    /// Drop the identified oneshot ingestion from the storage state.
1443    fn drop_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) {
1444        let prev = self.oneshot_ingestions.remove(&ingestion_id);
1445        info!(%ingestion_id, existed = %prev.is_some(), "dropping oneshot ingestion");
1446    }
1447}