mz_storage/
storage_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for storage timely instances.
7//!
8//! One instance of a [`Worker`], along with its contained [`StorageState`], is
9//! part of an ensemble of storage workers that all run inside the same timely
10//! cluster. We call this worker a _storage worker_ to disambiguate it from
11//! other kinds of workers, potentially other components that might be sharing
12//! the same timely cluster.
13//!
14//! ## Controller and internal communication
15//!
16//! A worker receives _external_ [`StorageCommands`](StorageCommand) from the
17//! storage controller, via a channel. Storage workers also share an _internal_
18//! control/command fabric ([`internal_control`]). Internal commands go through
19//! a sequencer dataflow that ensures that all workers receive all commands in
20//! the same consistent order.
21//!
22//! We need to make sure that commands that cause dataflows to be rendered are
23//! processed in the same consistent order across all workers because timely
24//! requires this. To achieve this, we make sure that only internal commands can
25//! cause dataflows to be rendered. External commands (from the controller)
26//! cause internal commands to be broadcast (by only one worker), to get
27//! dataflows rendered.
28//!
29//! The internal command fabric is also used to broadcast messages from a local
30//! operator/worker to all workers. For example, when we need to tear down and
31//! restart a dataflow on all workers when an error is encountered.
32//!
33//! ## Async Storage Worker
34//!
35//! The storage worker has a companion [`AsyncStorageWorker`] that must be used
36//! when running code that requires `async`. This is needed because a timely
37//! main loop cannot run `async` code.
38//!
39//! ## Example flow of commands for `RunIngestion`
40//!
41//! With external commands, internal commands, and the async worker,
42//! understanding where and how commands from the controller are realized can
43//! get complicated. We will follow the complete flow for `RunIngestion`, as an
44//! example:
45//!
46//! 1. Worker receives a [`StorageCommand::RunIngestion`] command from the
47//!    controller.
48//! 2. This command is processed in [`StorageState::handle_storage_command`].
49//!    This step cannot render dataflows, because it does not have access to the
50//!    timely worker. It will only set up state that stays over the whole
51//!    lifetime of the source, such as the `reported_frontier`. Putting in place
52//!    this reported frontier will enable frontier reporting for that source. We
53//!    will not start reporting when we only see an internal command for
54//!    rendering a dataflow, which can "overtake" the external `RunIngestion`
55//!    command.
56//! 3. During processing of that command, we call
57//!    [`AsyncStorageWorker::update_frontiers`], which causes a command to
58//!    be sent to the async worker.
59//! 4. We eventually get a response from the async worker:
60//!    [`AsyncStorageWorkerResponse::FrontiersUpdated`].
61//! 5. This response is handled in [`Worker::handle_async_worker_response`].
62//! 6. Handling that response causes a
63//!    [`InternalStorageCommand::CreateIngestionDataflow`] to be broadcast to
64//!    all workers via the internal command fabric.
65//! 7. This message will be processed (on each worker) in
66//!    [`Worker::handle_internal_storage_command`]. This is what will cause the
67//!    required dataflow to be rendered on all workers.
68//!
69//! The process described above assumes that the `RunIngestion` is _not_ an
70//! update, i.e. it is in response to a `CREATE SOURCE`-like statement.
71//!
72//! The primary distinction when handling a `RunIngestion` that represents an
73//! update, is that it might fill out new internal state in the mid-level
74//! clients on the way toward being run.
75
76use std::cell::RefCell;
77use std::collections::{BTreeMap, BTreeSet, VecDeque};
78use std::path::PathBuf;
79use std::rc::Rc;
80use std::sync::Arc;
81use std::thread;
82use std::time::Duration;
83
84use crossbeam_channel::{RecvError, TryRecvError};
85use fail::fail_point;
86use mz_ore::now::NowFn;
87use mz_ore::tracing::TracingHandle;
88use mz_ore::{soft_assert_or_log, soft_panic_or_log};
89use mz_persist_client::batch::ProtoBatch;
90use mz_persist_client::cache::PersistClientCache;
91use mz_persist_client::operators::shard_source::ErrorHandler;
92use mz_repr::{GlobalId, Timestamp};
93use mz_rocksdb::config::SharedWriteBufferManager;
94use mz_storage_client::client::{
95    RunIngestionCommand, StatusUpdate, StorageCommand, StorageResponse,
96};
97use mz_storage_types::AlterCompatible;
98use mz_storage_types::configuration::StorageConfiguration;
99use mz_storage_types::connections::ConnectionContext;
100use mz_storage_types::controller::CollectionMetadata;
101use mz_storage_types::dyncfgs::STORAGE_SERVER_MAINTENANCE_INTERVAL;
102use mz_storage_types::oneshot_sources::OneshotIngestionDescription;
103use mz_storage_types::sinks::StorageSinkDesc;
104use mz_storage_types::sources::IngestionDescription;
105use mz_timely_util::builder_async::PressOnDropButton;
106use mz_txn_wal::operator::TxnsContext;
107use timely::communication::Allocate;
108use timely::order::PartialOrder;
109use timely::progress::Timestamp as _;
110use timely::progress::frontier::Antichain;
111use timely::worker::Worker as TimelyWorker;
112use tokio::sync::{mpsc, watch};
113use tokio::time::Instant;
114use tracing::{info, warn};
115use uuid::Uuid;
116
117use crate::internal_control::{
118    self, DataflowParameters, InternalCommandReceiver, InternalCommandSender,
119    InternalStorageCommand,
120};
121use crate::metrics::StorageMetrics;
122use crate::statistics::{AggregatedStatistics, SinkStatistics, SourceStatistics};
123use crate::storage_state::async_storage_worker::{AsyncStorageWorker, AsyncStorageWorkerResponse};
124
125pub mod async_storage_worker;
126
127type CommandReceiver = crossbeam_channel::Receiver<StorageCommand>;
128type ResponseSender = mpsc::UnboundedSender<StorageResponse>;
129
130/// State maintained for each worker thread.
131///
132/// Much of this state can be viewed as local variables for the worker thread,
133/// holding state that persists across function calls.
134pub struct Worker<'w, A: Allocate> {
135    /// The underlying Timely worker.
136    ///
137    /// NOTE: This is `pub` for testing.
138    pub timely_worker: &'w mut TimelyWorker<A>,
139    /// The channel over which communication handles for newly connected clients
140    /// are delivered.
141    pub client_rx: crossbeam_channel::Receiver<(CommandReceiver, ResponseSender)>,
142    /// The state associated with collection ingress and egress.
143    pub storage_state: StorageState,
144}
145
146impl<'w, A: Allocate> Worker<'w, A> {
147    /// Creates new `Worker` state from the given components.
148    pub fn new(
149        timely_worker: &'w mut TimelyWorker<A>,
150        client_rx: crossbeam_channel::Receiver<(CommandReceiver, ResponseSender)>,
151        metrics: StorageMetrics,
152        now: NowFn,
153        connection_context: ConnectionContext,
154        instance_context: StorageInstanceContext,
155        persist_clients: Arc<PersistClientCache>,
156        txns_ctx: TxnsContext,
157        tracing_handle: Arc<TracingHandle>,
158        shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
159    ) -> Self {
160        // It is very important that we only create the internal control
161        // flow/command sequencer once because a) the worker state is re-used
162        // when a new client connects and b) dataflows that have already been
163        // rendered into the timely worker are reused as well.
164        //
165        // If we created a new sequencer every time we get a new client (likely
166        // because the controller re-started and re-connected), dataflows that
167        // were rendered before would still hold a handle to the old sequencer
168        // but we would not read their commands anymore.
169        let (internal_cmd_tx, internal_cmd_rx) =
170            internal_control::setup_command_sequencer(timely_worker);
171
172        let storage_configuration =
173            StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs());
174
175        // We always initialize as read_only=true. Only when we're explicitly
176        // allowed do we switch to doing writes.
177        let (read_only_tx, read_only_rx) = watch::channel(true);
178
179        // Similar to the internal command sequencer, it is very important that
180        // we only create the async worker once because a) the worker state is
181        // re-used when a new client connects and b) commands that have already
182        // been sent and might yield a response will be lost if a new iteration
183        // of `run_client` creates a new async worker.
184        //
185        // If we created a new async worker every time we get a new client
186        // (likely because the controller re-started and re-connected), we can
187        // get into an inconsistent state where we think that a dataflow has
188        // been rendered, for example because there is an entry in
189        // `StorageState::ingestions`, while there is not yet a dataflow. This
190        // happens because the dataflow only gets rendered once we get a
191        // response from the async worker and send off an internal command.
192        //
193        // The core idea is that both the sequencer and the async worker are
194        // part of the per-worker state, and must be treated as such, meaning
195        // they must survive between invocations of `run_client`.
196
197        // TODO(aljoscha): This thread unparking business seems brittle, but that's
198        // also how the command channel works currently. We can wrap it inside a
199        // struct that holds both a channel and a `Thread`, but I don't
200        // think that would help too much.
201        let async_worker = async_storage_worker::AsyncStorageWorker::new(
202            thread::current(),
203            Arc::clone(&persist_clients),
204        );
205        let cluster_memory_limit = instance_context.cluster_memory_limit;
206
207        let storage_state = StorageState {
208            source_uppers: BTreeMap::new(),
209            source_tokens: BTreeMap::new(),
210            metrics,
211            reported_frontiers: BTreeMap::new(),
212            ingestions: BTreeMap::new(),
213            exports: BTreeMap::new(),
214            oneshot_ingestions: BTreeMap::new(),
215            now,
216            timely_worker_index: timely_worker.index(),
217            timely_worker_peers: timely_worker.peers(),
218            instance_context,
219            persist_clients,
220            txns_ctx,
221            sink_tokens: BTreeMap::new(),
222            sink_write_frontiers: BTreeMap::new(),
223            dropped_ids: Vec::new(),
224            aggregated_statistics: AggregatedStatistics::new(
225                timely_worker.index(),
226                timely_worker.peers(),
227            ),
228            shared_status_updates: Default::default(),
229            latest_status_updates: Default::default(),
230            initial_status_reported: Default::default(),
231            internal_cmd_tx,
232            internal_cmd_rx,
233            read_only_tx,
234            read_only_rx,
235            async_worker,
236            storage_configuration,
237            dataflow_parameters: DataflowParameters::new(
238                shared_rocksdb_write_buffer_manager,
239                cluster_memory_limit,
240            ),
241            tracing_handle,
242            server_maintenance_interval: Duration::ZERO,
243        };
244
245        // TODO(aljoscha): We might want `async_worker` and `internal_cmd_tx` to
246        // be fields of `Worker` instead of `StorageState`, but at least for the
247        // command flow sources and sinks need access to that. We can refactor
248        // this once we have a clearer boundary between what sources/sinks need
249        // and the full "power" of the internal command flow, which should stay
250        // internal to the worker/not be exposed to source/sink implementations.
251        Self {
252            timely_worker,
253            client_rx,
254            storage_state,
255        }
256    }
257}
258
259/// Worker-local state related to the ingress or egress of collections of data.
260pub struct StorageState {
261    /// The highest observed upper frontier for collection.
262    ///
263    /// This is shared among all source instances, so that they can jointly advance the
264    /// frontier even as other instances are created and dropped. Ideally, the Storage
265    /// module would eventually provide one source of truth on this rather than multiple,
266    /// and we should aim for that but are not there yet.
267    pub source_uppers: BTreeMap<GlobalId, Rc<RefCell<Antichain<mz_repr::Timestamp>>>>,
268    /// Handles to created sources, keyed by ID
269    /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
270    /// to prevent usage of custom shutdown tokens that are tricky to get right.
271    pub source_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
272    /// Metrics for storage objects.
273    pub metrics: StorageMetrics,
274    /// Tracks the conditional write frontiers we have reported.
275    pub reported_frontiers: BTreeMap<GlobalId, Antichain<Timestamp>>,
276    /// Descriptions of each installed ingestion.
277    pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
278    /// Descriptions of each installed export.
279    pub exports: BTreeMap<GlobalId, StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>,
280    /// Descriptions of oneshot ingestions that are currently running.
281    pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
282    /// Undocumented
283    pub now: NowFn,
284    /// Index of the associated timely dataflow worker.
285    pub timely_worker_index: usize,
286    /// Peers in the associated timely dataflow worker.
287    pub timely_worker_peers: usize,
288    /// Other configuration for sources and sinks.
289    pub instance_context: StorageInstanceContext,
290    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
291    /// This is intentionally shared between workers
292    pub persist_clients: Arc<PersistClientCache>,
293    /// Context necessary for rendering txn-wal operators.
294    pub txns_ctx: TxnsContext,
295    /// Tokens that should be dropped when a dataflow is dropped to clean up
296    /// associated state.
297    /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
298    /// to prevent usage of custom shutdown tokens that are tricky to get right.
299    pub sink_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
300    /// Frontier of sink writes (all subsequent writes will be at times at or
301    /// equal to this frontier)
302    pub sink_write_frontiers: BTreeMap<GlobalId, Rc<RefCell<Antichain<Timestamp>>>>,
303    /// Collection ids that have been dropped but not yet reported as dropped
304    pub dropped_ids: Vec<GlobalId>,
305
306    /// Statistics for sources and sinks.
307    pub aggregated_statistics: AggregatedStatistics,
308
309    /// A place shared with running dataflows, so that health operators, can
310    /// report status updates back to us.
311    ///
312    /// **NOTE**: Operators that append to this collection should take care to only add new
313    /// status updates if the status of the ingestion/export in question has _changed_.
314    pub shared_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,
315
316    /// The latest status update for each object.
317    pub latest_status_updates: BTreeMap<GlobalId, StatusUpdate>,
318
319    /// Whether we have reported the initial status after connecting to a new client.
320    /// This is reset to false when a new client connects.
321    pub initial_status_reported: bool,
322
323    /// Sender for cluster-internal storage commands. These can be sent from
324    /// within workers/operators and will be distributed to all workers. For
325    /// example, for shutting down an entire dataflow from within a
326    /// operator/worker.
327    pub internal_cmd_tx: InternalCommandSender,
328    /// Receiver for cluster-internal storage commands.
329    pub internal_cmd_rx: InternalCommandReceiver,
330
331    /// When this replica/cluster is in read-only mode it must not affect any
332    /// changes to external state. This flag can only be changed by a
333    /// [StorageCommand::AllowWrites].
334    ///
335    /// Everything running on this replica/cluster must obey this flag. At the
336    /// time of writing, nothing currently looks at this flag.
337    /// TODO(benesch): fix this.
338    ///
339    /// NOTE: In the future, we might want a more complicated flag, for example
340    /// something that tells us after which timestamp we are allowed to write.
341    /// In this first version we are keeping things as simple as possible!
342    pub read_only_rx: watch::Receiver<bool>,
343
344    /// Send-side for read-only state.
345    pub read_only_tx: watch::Sender<bool>,
346
347    /// Async worker companion, used for running code that requires async, which
348    /// the timely main loop cannot do.
349    pub async_worker: AsyncStorageWorker<mz_repr::Timestamp>,
350
351    /// Configuration for source and sink connections.
352    pub storage_configuration: StorageConfiguration,
353    /// Dynamically configurable parameters that control how dataflows are rendered.
354    /// NOTE(guswynn): we should consider moving these into `storage_configuration`.
355    pub dataflow_parameters: DataflowParameters,
356
357    /// A process-global handle to tracing configuration.
358    pub tracing_handle: Arc<TracingHandle>,
359
360    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
361    /// perform maintenance with every `step_or_park` invocation.
362    pub server_maintenance_interval: Duration,
363}
364
365impl StorageState {
366    /// Return an error handler that triggers a suspend and restart of the corresponding storage
367    /// dataflow.
368    pub fn error_handler(&self, context: &'static str, id: GlobalId) -> ErrorHandler {
369        let tx = self.internal_cmd_tx.clone();
370        ErrorHandler::signal(move |e| {
371            tx.send(InternalStorageCommand::SuspendAndRestart {
372                id,
373                reason: format!("{context}: {e:#}"),
374            })
375        })
376    }
377}
378
379/// Extra context for a storage instance.
380/// This is extra information that is used when rendering source
381/// and sinks that is not tied to the source/connection configuration itself.
382#[derive(Clone)]
383pub struct StorageInstanceContext {
384    /// A directory that can be used for scratch work.
385    pub scratch_directory: Option<PathBuf>,
386    /// A global `rocksdb::Env`, shared across ALL instances of `RocksDB` (even
387    /// across sources!). This `Env` lets us control some resources (like background threads)
388    /// process-wide.
389    pub rocksdb_env: rocksdb::Env,
390    /// The memory limit of the materialize cluster replica. This will
391    /// be used to calculate and configure the maximum inflight bytes for backpressure
392    pub cluster_memory_limit: Option<usize>,
393}
394
395impl StorageInstanceContext {
396    /// Build a new `StorageInstanceContext`.
397    pub fn new(
398        scratch_directory: Option<PathBuf>,
399        cluster_memory_limit: Option<usize>,
400    ) -> Result<Self, anyhow::Error> {
401        Ok(Self {
402            scratch_directory,
403            rocksdb_env: rocksdb::Env::new()?,
404            cluster_memory_limit,
405        })
406    }
407
408    /// Constructs a new connection context for usage in tests.
409    pub fn for_tests(rocksdb_env: rocksdb::Env) -> Self {
410        Self {
411            scratch_directory: None,
412            rocksdb_env,
413            cluster_memory_limit: None,
414        }
415    }
416}
417
418impl<'w, A: Allocate> Worker<'w, A> {
419    /// Waits for client connections and runs them to completion.
420    pub fn run(&mut self) {
421        while let Ok((rx, tx)) = self.client_rx.recv() {
422            self.run_client(rx, tx);
423        }
424    }
425
426    /// Runs this (timely) storage worker until the given `command_rx` is
427    /// disconnected.
428    ///
429    /// See the [module documentation](crate::storage_state) for this
430    /// workers responsibilities, how it communicates with the other workers and
431    /// how commands flow from the controller and through the workers.
432    fn run_client(&mut self, command_rx: CommandReceiver, response_tx: ResponseSender) {
433        // At this point, all workers are still reading from the command flow.
434        if self.reconcile(&command_rx).is_err() {
435            return;
436        }
437
438        // The last time we reported statistics.
439        let mut last_stats_time = Instant::now();
440
441        // The last time we did periodic maintenance.
442        let mut last_maintenance = std::time::Instant::now();
443
444        let mut disconnected = false;
445        while !disconnected {
446            let config = &self.storage_state.storage_configuration;
447            let stats_interval = config.parameters.statistics_collection_interval;
448
449            let maintenance_interval = self.storage_state.server_maintenance_interval;
450
451            let now = std::time::Instant::now();
452            // Determine if we need to perform maintenance, which is true if `maintenance_interval`
453            // time has passed since the last maintenance.
454            let sleep_duration;
455            if now >= last_maintenance + maintenance_interval {
456                last_maintenance = now;
457                sleep_duration = None;
458
459                self.report_frontier_progress(&response_tx);
460            } else {
461                // We didn't perform maintenance, sleep until the next maintenance interval.
462                let next_maintenance = last_maintenance + maintenance_interval;
463                sleep_duration = Some(next_maintenance.saturating_duration_since(now))
464            }
465
466            // Ask Timely to execute a unit of work.
467            //
468            // If there are no pending commands or responses from the async
469            // worker, we ask Timely to park the thread if there's nothing to
470            // do. We rely on another thread unparking us when there's new work
471            // to be done, e.g., when sending a command or when new Kafka
472            // messages have arrived.
473            //
474            // It is critical that we allow Timely to park iff there are no
475            // pending commands or responses. The command may have already been
476            // consumed by the call to `client_rx.recv`. See:
477            // https://github.com/MaterializeInc/materialize/pull/13973#issuecomment-1200312212
478            if command_rx.is_empty() && self.storage_state.async_worker.is_empty() {
479                // Make sure we wake up again to report any pending statistics updates.
480                let mut park_duration = stats_interval.saturating_sub(last_stats_time.elapsed());
481                if let Some(sleep_duration) = sleep_duration {
482                    park_duration = std::cmp::min(sleep_duration, park_duration);
483                }
484                self.timely_worker.step_or_park(Some(park_duration));
485            } else {
486                self.timely_worker.step();
487            }
488
489            // Rerport any dropped ids
490            for id in std::mem::take(&mut self.storage_state.dropped_ids) {
491                self.send_storage_response(&response_tx, StorageResponse::DroppedId(id));
492            }
493
494            self.process_oneshot_ingestions(&response_tx);
495
496            self.report_status_updates(&response_tx);
497
498            if last_stats_time.elapsed() >= stats_interval {
499                self.report_storage_statistics(&response_tx);
500                last_stats_time = Instant::now();
501            }
502
503            // Handle any received commands.
504            loop {
505                match command_rx.try_recv() {
506                    Ok(cmd) => self.storage_state.handle_storage_command(cmd),
507                    Err(TryRecvError::Empty) => break,
508                    Err(TryRecvError::Disconnected) => {
509                        disconnected = true;
510                        break;
511                    }
512                }
513            }
514
515            // Handle responses from the async worker.
516            while let Ok(response) = self.storage_state.async_worker.try_recv() {
517                self.handle_async_worker_response(response);
518            }
519
520            // Handle any received commands.
521            while let Some(command) = self.storage_state.internal_cmd_rx.try_recv() {
522                self.handle_internal_storage_command(command);
523            }
524        }
525    }
526
527    /// Entry point for applying a response from the async storage worker.
528    pub fn handle_async_worker_response(
529        &self,
530        async_response: AsyncStorageWorkerResponse<mz_repr::Timestamp>,
531    ) {
532        // NOTE: If we want to share the load of async processing we
533        // have to change `handle_storage_command` and change this
534        // assert.
535        assert_eq!(
536            self.timely_worker.index(),
537            0,
538            "only worker #0 is doing async processing"
539        );
540        match async_response {
541            AsyncStorageWorkerResponse::FrontiersUpdated {
542                id,
543                ingestion_description,
544                as_of,
545                resume_uppers,
546                source_resume_uppers,
547            } => {
548                self.storage_state.internal_cmd_tx.send(
549                    InternalStorageCommand::CreateIngestionDataflow {
550                        id,
551                        ingestion_description,
552                        as_of,
553                        resume_uppers,
554                        source_resume_uppers,
555                    },
556                );
557            }
558            AsyncStorageWorkerResponse::DropDataflow(id) => {
559                self.storage_state
560                    .internal_cmd_tx
561                    .send(InternalStorageCommand::DropDataflow(vec![id]));
562            }
563        }
564    }
565
566    /// Entry point for applying an internal storage command.
567    pub fn handle_internal_storage_command(&mut self, internal_cmd: InternalStorageCommand) {
568        match internal_cmd {
569            InternalStorageCommand::SuspendAndRestart { id, reason } => {
570                info!(
571                    "worker {}/{} initiating suspend-and-restart for {id} because of: {reason}",
572                    self.timely_worker.index(),
573                    self.timely_worker.peers(),
574                );
575
576                let maybe_ingestion = self.storage_state.ingestions.get(&id).cloned();
577                if let Some(ingestion_description) = maybe_ingestion {
578                    // Yank the token of the previously existing source dataflow.Note that this
579                    // token also includes any source exports/subsources.
580                    let maybe_token = self.storage_state.source_tokens.remove(&id);
581                    if maybe_token.is_none() {
582                        // Something has dropped the source. Make sure we don't
583                        // accidentally re-create it.
584                        return;
585                    }
586
587                    // This needs to be done by one worker, which will
588                    // broadcasts a `CreateIngestionDataflow` command to all
589                    // workers based on the response that contains the
590                    // resumption upper.
591                    //
592                    // Doing this separately on each worker could lead to
593                    // differing resume_uppers which might lead to all kinds of
594                    // mayhem.
595                    //
596                    // TODO(aljoscha): If we ever become worried that this is
597                    // putting undue pressure on worker 0 we can pick the
598                    // designated worker for a source/sink based on `id.hash()`.
599                    if self.timely_worker.index() == 0 {
600                        for (id, _) in ingestion_description.source_exports.iter() {
601                            self.storage_state
602                                .aggregated_statistics
603                                .advance_global_epoch(*id);
604                        }
605                        self.storage_state
606                            .async_worker
607                            .update_frontiers(id, ingestion_description);
608                    }
609
610                    // Continue with other commands.
611                    return;
612                }
613
614                let maybe_sink = self.storage_state.exports.get(&id).cloned();
615                if let Some(sink_description) = maybe_sink {
616                    // Yank the token of the previously existing sink
617                    // dataflow.
618                    let maybe_token = self.storage_state.sink_tokens.remove(&id);
619
620                    if maybe_token.is_none() {
621                        // Something has dropped the sink. Make sure we don't
622                        // accidentally re-create it.
623                        return;
624                    }
625
626                    // This needs to be broadcast by one worker and go through
627                    // the internal command fabric, to ensure consistent
628                    // ordering of dataflow rendering across all workers.
629                    if self.timely_worker.index() == 0 {
630                        self.storage_state
631                            .aggregated_statistics
632                            .advance_global_epoch(id);
633                        self.storage_state.internal_cmd_tx.send(
634                            InternalStorageCommand::RunSinkDataflow(id, sink_description),
635                        );
636                    }
637
638                    // Continue with other commands.
639                    return;
640                }
641
642                if !self
643                    .storage_state
644                    .ingestions
645                    .values()
646                    .any(|v| v.source_exports.contains_key(&id))
647                {
648                    // Our current approach to dropping a source results in a race between shard
649                    // finalization (which happens in the controller) and dataflow shutdown (which
650                    // happens in clusterd). If a source is created and dropped fast enough -or the
651                    // two commands get sufficiently delayed- then it's possible to receive a
652                    // SuspendAndRestart command for an unknown source. We cannot assert that this
653                    // never happens but we log an error here to track how often this happens.
654                    warn!(
655                        "got InternalStorageCommand::SuspendAndRestart for something that is not a source or sink: {id}"
656                    );
657                }
658            }
659            InternalStorageCommand::CreateIngestionDataflow {
660                id: ingestion_id,
661                mut ingestion_description,
662                as_of,
663                mut resume_uppers,
664                mut source_resume_uppers,
665            } => {
666                info!(
667                    ?as_of,
668                    ?resume_uppers,
669                    "worker {}/{} trying to (re-)start ingestion {ingestion_id}",
670                    self.timely_worker.index(),
671                    self.timely_worker.peers(),
672                );
673
674                // We initialize statistics before we prune finished exports. We
675                // still want to export statistics for these, plus the rendering
676                // machinery will get confused if there are not at least
677                // statistics for the "main" source.
678                for (export_id, export) in ingestion_description.source_exports.iter() {
679                    let resume_upper = resume_uppers[export_id].clone();
680                    self.storage_state.aggregated_statistics.initialize_source(
681                        *export_id,
682                        resume_upper.clone(),
683                        || {
684                            SourceStatistics::new(
685                                *export_id,
686                                self.storage_state.timely_worker_index,
687                                &self.storage_state.metrics.source_statistics,
688                                ingestion_id,
689                                &export.storage_metadata.data_shard,
690                                export.data_config.envelope.clone(),
691                                resume_upper,
692                            )
693                        },
694                    );
695                }
696
697                let finished_exports: BTreeSet<GlobalId> = resume_uppers
698                    .iter()
699                    .filter(|(_, frontier)| frontier.is_empty())
700                    .map(|(id, _)| *id)
701                    .collect();
702
703                resume_uppers.retain(|id, _| !finished_exports.contains(id));
704                source_resume_uppers.retain(|id, _| !finished_exports.contains(id));
705                ingestion_description
706                    .source_exports
707                    .retain(|id, _| !finished_exports.contains(id));
708
709                for id in ingestion_description.collection_ids() {
710                    // If there is already a shared upper, we re-use it, to make
711                    // sure that parties that are already using the shared upper
712                    // can continue doing so.
713                    let source_upper = self
714                        .storage_state
715                        .source_uppers
716                        .entry(id.clone())
717                        .or_insert_with(|| {
718                            Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())))
719                        });
720
721                    let mut source_upper = source_upper.borrow_mut();
722                    if !source_upper.is_empty() {
723                        source_upper.clear();
724                        source_upper.insert(mz_repr::Timestamp::minimum());
725                    }
726                }
727
728                // If all subsources of the source are finished, we can skip rendering entirely.
729                // Also, if `as_of` is empty, the dataflow has been finalized, so we can skip it as
730                // well.
731                //
732                // TODO(guswynn|petrosagg): this is a bit hacky, and is a consequence of storage state
733                // management being a bit of a mess. we should clean this up and remove weird if
734                // statements like this.
735                if resume_uppers.values().all(|frontier| frontier.is_empty()) || as_of.is_empty() {
736                    tracing::info!(
737                        ?resume_uppers,
738                        ?as_of,
739                        "worker {}/{} skipping building ingestion dataflow \
740                        for {ingestion_id} because the ingestion is finished",
741                        self.timely_worker.index(),
742                        self.timely_worker.peers(),
743                    );
744                    return;
745                }
746
747                crate::render::build_ingestion_dataflow(
748                    self.timely_worker,
749                    &mut self.storage_state,
750                    ingestion_id,
751                    ingestion_description,
752                    as_of,
753                    resume_uppers,
754                    source_resume_uppers,
755                );
756            }
757            InternalStorageCommand::RunOneshotIngestion {
758                ingestion_id,
759                collection_id,
760                collection_meta,
761                request,
762            } => {
763                crate::render::build_oneshot_ingestion_dataflow(
764                    self.timely_worker,
765                    &mut self.storage_state,
766                    ingestion_id,
767                    collection_id,
768                    collection_meta,
769                    request,
770                );
771            }
772            InternalStorageCommand::RunSinkDataflow(sink_id, sink_description) => {
773                info!(
774                    "worker {}/{} trying to (re-)start sink {sink_id}",
775                    self.timely_worker.index(),
776                    self.timely_worker.peers(),
777                );
778
779                {
780                    // If there is already a shared write frontier, we re-use it, to
781                    // make sure that parties that are already using the shared
782                    // frontier can continue doing so.
783                    let sink_write_frontier = self
784                        .storage_state
785                        .sink_write_frontiers
786                        .entry(sink_id.clone())
787                        .or_insert_with(|| Rc::new(RefCell::new(Antichain::new())));
788
789                    let mut sink_write_frontier = sink_write_frontier.borrow_mut();
790                    sink_write_frontier.clear();
791                    sink_write_frontier.insert(mz_repr::Timestamp::minimum());
792                }
793                self.storage_state
794                    .aggregated_statistics
795                    .initialize_sink(sink_id, || {
796                        SinkStatistics::new(
797                            sink_id,
798                            self.storage_state.timely_worker_index,
799                            &self.storage_state.metrics.sink_statistics,
800                        )
801                    });
802
803                crate::render::build_export_dataflow(
804                    self.timely_worker,
805                    &mut self.storage_state,
806                    sink_id,
807                    sink_description,
808                );
809            }
810            InternalStorageCommand::DropDataflow(ids) => {
811                for id in &ids {
812                    // Clean up per-source / per-sink state.
813                    self.storage_state.source_uppers.remove(id);
814                    self.storage_state.source_tokens.remove(id);
815
816                    self.storage_state.sink_tokens.remove(id);
817
818                    self.storage_state.aggregated_statistics.deinitialize(*id);
819                }
820            }
821            InternalStorageCommand::UpdateConfiguration { storage_parameters } => {
822                self.storage_state
823                    .dataflow_parameters
824                    .update(storage_parameters.clone());
825                self.storage_state
826                    .storage_configuration
827                    .update(storage_parameters);
828
829                // Clear out the updates as we no longer forward them to anyone else to process.
830                // We clone `StorageState::storage_configuration` many times during rendering
831                // and want to avoid cloning these unused updates.
832                self.storage_state
833                    .storage_configuration
834                    .parameters
835                    .dyncfg_updates = Default::default();
836
837                // Remember the maintenance interval locally to avoid reading it from the config set on
838                // every server iteration.
839                self.storage_state.server_maintenance_interval =
840                    STORAGE_SERVER_MAINTENANCE_INTERVAL
841                        .get(self.storage_state.storage_configuration.config_set());
842            }
843            InternalStorageCommand::StatisticsUpdate { sources, sinks } => self
844                .storage_state
845                .aggregated_statistics
846                .ingest(sources, sinks),
847        }
848    }
849
850    /// Emit information about write frontier progress, along with information that should
851    /// be made durable for this to be the case.
852    ///
853    /// The write frontier progress is "conditional" in that it is not until the information is made
854    /// durable that the data are emitted to downstream workers, and indeed they should not rely on
855    /// the completeness of what they hear until the information is made durable.
856    ///
857    /// Specifically, this sends information about new timestamp bindings created by dataflow workers,
858    /// with the understanding if that if made durable (and ack'd back to the workers) the source will
859    /// in fact progress with this write frontier.
860    pub fn report_frontier_progress(&mut self, response_tx: &ResponseSender) {
861        let mut new_uppers = Vec::new();
862
863        // Check if any observed frontier should advance the reported frontiers.
864        for (id, frontier) in self
865            .storage_state
866            .source_uppers
867            .iter()
868            .chain(self.storage_state.sink_write_frontiers.iter())
869        {
870            let Some(reported_frontier) = self.storage_state.reported_frontiers.get_mut(id) else {
871                // Frontier reporting has not yet been started for this object.
872                // Potentially because this timely worker has not yet seen the
873                // `CreateSources` command.
874                continue;
875            };
876
877            let observed_frontier = frontier.borrow();
878
879            // Only do a thing if it *advances* the frontier, not just *changes* the frontier.
880            // This is protection against `frontier` lagging behind what we have conditionally reported.
881            if PartialOrder::less_than(reported_frontier, &observed_frontier) {
882                new_uppers.push((*id, observed_frontier.clone()));
883                reported_frontier.clone_from(&observed_frontier);
884            }
885        }
886
887        for (id, upper) in new_uppers {
888            self.send_storage_response(response_tx, StorageResponse::FrontierUpper(id, upper));
889        }
890    }
891
892    /// Pumps latest status updates from the buffer shared with operators and
893    /// reports any updates that need reporting.
894    pub fn report_status_updates(&mut self, response_tx: &ResponseSender) {
895        // If we haven't done the initial status report, report all current statuses
896        if !self.storage_state.initial_status_reported {
897            // We pull initially reported status updates to "now", so that they
898            // sort as the latest update in internal status collections. This
899            // makes it so that a newly bootstrapped envd can append status
900            // updates to internal status collections that report an accurate
901            // view as of the time when they came up.
902            let now_ts = mz_ore::now::to_datetime((self.storage_state.now)());
903            let status_updates = self
904                .storage_state
905                .latest_status_updates
906                .values()
907                .cloned()
908                .map(|mut update| {
909                    update.timestamp = now_ts.clone();
910                    update
911                });
912            for update in status_updates {
913                self.send_storage_response(response_tx, StorageResponse::StatusUpdate(update));
914            }
915            self.storage_state.initial_status_reported = true;
916        }
917
918        // Pump updates into our state and stage them for reporting.
919        for shared_update in self.storage_state.shared_status_updates.take() {
920            self.send_storage_response(
921                response_tx,
922                StorageResponse::StatusUpdate(shared_update.clone()),
923            );
924
925            self.storage_state
926                .latest_status_updates
927                .insert(shared_update.id, shared_update);
928        }
929    }
930
931    /// Report source statistics back to the controller.
932    pub fn report_storage_statistics(&mut self, response_tx: &ResponseSender) {
933        let (sources, sinks) = self.storage_state.aggregated_statistics.emit_local();
934        if !sources.is_empty() || !sinks.is_empty() {
935            self.storage_state
936                .internal_cmd_tx
937                .send(InternalStorageCommand::StatisticsUpdate { sources, sinks })
938        }
939
940        let (sources, sinks) = self.storage_state.aggregated_statistics.snapshot();
941        if !sources.is_empty() || !sinks.is_empty() {
942            self.send_storage_response(
943                response_tx,
944                StorageResponse::StatisticsUpdates(sources, sinks),
945            );
946        }
947    }
948
949    /// Send a response to the coordinator.
950    fn send_storage_response(&self, response_tx: &ResponseSender, response: StorageResponse) {
951        // Ignore send errors because the coordinator is free to ignore our
952        // responses. This happens during shutdown.
953        let _ = response_tx.send(response);
954    }
955
956    fn process_oneshot_ingestions(&mut self, response_tx: &ResponseSender) {
957        use tokio::sync::mpsc::error::TryRecvError;
958
959        let mut to_remove = vec![];
960
961        for (ingestion_id, ingestion_state) in &mut self.storage_state.oneshot_ingestions {
962            loop {
963                match ingestion_state.results.try_recv() {
964                    Ok(result) => {
965                        let response = match result {
966                            Ok(maybe_batch) => maybe_batch.into_iter().map(Result::Ok).collect(),
967                            Err(err) => vec![Err(err)],
968                        };
969                        let staged_batches = BTreeMap::from([(*ingestion_id, response)]);
970                        let _ = response_tx.send(StorageResponse::StagedBatches(staged_batches));
971                    }
972                    Err(TryRecvError::Empty) => {
973                        break;
974                    }
975                    Err(TryRecvError::Disconnected) => {
976                        to_remove.push(*ingestion_id);
977                        break;
978                    }
979                }
980            }
981        }
982
983        for ingestion_id in to_remove {
984            tracing::info!(?ingestion_id, "removing oneshot ingestion");
985            self.storage_state.oneshot_ingestions.remove(&ingestion_id);
986        }
987    }
988
989    /// Extract commands until `InitializationComplete`, and make the worker
990    /// reflect those commands. If the worker can not be made to reflect the
991    /// commands, return an error.
992    fn reconcile(&mut self, command_rx: &CommandReceiver) -> Result<(), RecvError> {
993        let worker_id = self.timely_worker.index();
994
995        // To initialize the connection, we want to drain all commands until we
996        // receive a `StorageCommand::InitializationComplete` command to form a
997        // target command state.
998        let mut commands = vec![];
999        loop {
1000            match command_rx.recv()? {
1001                StorageCommand::InitializationComplete => break,
1002                command => commands.push(command),
1003            }
1004        }
1005
1006        // Track which frontiers this envd expects; we will also set their
1007        // initial timestamp to the minimum timestamp to reset them as we don't
1008        // know what frontiers the new envd expects.
1009        let mut expected_objects = BTreeSet::new();
1010
1011        let mut drop_commands = BTreeSet::new();
1012        let mut running_ingestion_descriptions = self.storage_state.ingestions.clone();
1013        let mut running_exports_descriptions = self.storage_state.exports.clone();
1014
1015        let mut create_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1016        let mut cancel_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1017
1018        for command in &mut commands {
1019            match command {
1020                StorageCommand::CreateTimely { .. } => {
1021                    panic!("CreateTimely must be captured before")
1022                }
1023                StorageCommand::AllowCompaction(id, since) => {
1024                    info!(%worker_id, ?id, ?since, "reconcile: received AllowCompaction command");
1025
1026                    // collect all "drop commands". These are `AllowCompaction`
1027                    // commands that compact to the empty since. Then, later, we make sure
1028                    // we retain only those `Create*` commands that are not dropped. We
1029                    // assume that the `AllowCompaction` command is ordered after the
1030                    // `Create*` commands but don't assert that.
1031                    // WIP: Should we assert?
1032                    if since.is_empty() {
1033                        drop_commands.insert(*id);
1034                    }
1035                }
1036                StorageCommand::RunIngestion(ingestion) => {
1037                    info!(%worker_id, ?ingestion, "reconcile: received RunIngestion command");
1038
1039                    // Ensure that ingestions are forward-rolling alter compatible.
1040                    let prev = running_ingestion_descriptions
1041                        .insert(ingestion.id, ingestion.description.clone());
1042
1043                    if let Some(prev_ingest) = prev {
1044                        // If the new ingestion is not exactly equal to the currently running
1045                        // ingestion, we must either track that we need to synthesize an update
1046                        // command to change the ingestion, or panic.
1047                        prev_ingest
1048                            .alter_compatible(ingestion.id, &ingestion.description)
1049                            .expect("only alter compatible ingestions permitted");
1050                    }
1051                }
1052                StorageCommand::RunSink(export) => {
1053                    info!(%worker_id, ?export, "reconcile: received RunSink command");
1054
1055                    // Ensure that exports are forward-rolling alter compatible.
1056                    let prev =
1057                        running_exports_descriptions.insert(export.id, export.description.clone());
1058
1059                    if let Some(prev_export) = prev {
1060                        prev_export
1061                            .alter_compatible(export.id, &export.description)
1062                            .expect("only alter compatible exports permitted");
1063                    }
1064                }
1065                StorageCommand::RunOneshotIngestion(ingestion) => {
1066                    info!(%worker_id, ?ingestion, "reconcile: received RunOneshotIngestion command");
1067                    create_oneshot_ingestions.insert(ingestion.ingestion_id);
1068                }
1069                StorageCommand::CancelOneshotIngestion(uuid) => {
1070                    info!(%worker_id, %uuid, "reconcile: received CancelOneshotIngestion command");
1071                    cancel_oneshot_ingestions.insert(*uuid);
1072                }
1073                StorageCommand::InitializationComplete
1074                | StorageCommand::AllowWrites
1075                | StorageCommand::UpdateConfiguration(_) => (),
1076            }
1077        }
1078
1079        let mut seen_most_recent_definition = BTreeSet::new();
1080
1081        // We iterate over this backward to ensure that we keep only the most recent ingestion
1082        // description.
1083        let mut filtered_commands = VecDeque::new();
1084        for mut command in commands.into_iter().rev() {
1085            let mut should_keep = true;
1086            match &mut command {
1087                StorageCommand::CreateTimely { .. } => {
1088                    panic!("CreateTimely must be captured before")
1089                }
1090                StorageCommand::RunIngestion(ingestion) => {
1091                    // Subsources can be dropped independently of their
1092                    // primary source, so we evaluate them in a separate
1093                    // loop.
1094                    for export_id in ingestion
1095                        .description
1096                        .source_exports
1097                        .keys()
1098                        .filter(|export_id| **export_id != ingestion.id)
1099                    {
1100                        if drop_commands.remove(export_id) {
1101                            info!(%worker_id, %export_id, "reconcile: dropping subsource");
1102                            self.storage_state.dropped_ids.push(*export_id);
1103                        }
1104                    }
1105
1106                    if drop_commands.remove(&ingestion.id)
1107                        || self.storage_state.dropped_ids.contains(&ingestion.id)
1108                    {
1109                        info!(%worker_id, %ingestion.id, "reconcile: dropping ingestion");
1110
1111                        // If an ingestion is dropped, so too must all of
1112                        // its subsources (i.e. ingestion exports, as well
1113                        // as its progress subsource).
1114                        for id in ingestion.description.collection_ids() {
1115                            drop_commands.remove(&id);
1116                            self.storage_state.dropped_ids.push(id);
1117                        }
1118                        should_keep = false;
1119                    } else {
1120                        let most_recent_defintion =
1121                            seen_most_recent_definition.insert(ingestion.id);
1122
1123                        if most_recent_defintion {
1124                            // If this is the most recent definition, this
1125                            // is what we will be running when
1126                            // reconciliation completes. This definition
1127                            // must not include any dropped subsources.
1128                            ingestion.description.source_exports.retain(|export_id, _| {
1129                                !self.storage_state.dropped_ids.contains(export_id)
1130                            });
1131
1132                            // After clearing any dropped subsources, we can
1133                            // state that we expect all of these to exist.
1134                            expected_objects.extend(ingestion.description.collection_ids());
1135                        }
1136
1137                        let running_ingestion = self.storage_state.ingestions.get(&ingestion.id);
1138
1139                        // We keep only:
1140                        // - The most recent version of the ingestion, which
1141                        //   is why these commands are run in reverse.
1142                        // - Ingestions whose descriptions are not exactly
1143                        //   those that are currently running.
1144                        should_keep = most_recent_defintion
1145                            && running_ingestion != Some(&ingestion.description)
1146                    }
1147                }
1148                StorageCommand::RunSink(export) => {
1149                    if drop_commands.remove(&export.id)
1150                        // If there were multiple `RunSink` in the command
1151                        // stream, we want to ensure none of them are
1152                        // retained.
1153                        || self.storage_state.dropped_ids.contains(&export.id)
1154                    {
1155                        info!(%worker_id, %export.id, "reconcile: dropping sink");
1156
1157                        // Make sure that we report back that the ID was
1158                        // dropped.
1159                        self.storage_state.dropped_ids.push(export.id);
1160
1161                        should_keep = false
1162                    } else {
1163                        expected_objects.insert(export.id);
1164
1165                        let running_sink = self.storage_state.exports.get(&export.id);
1166
1167                        // We keep only:
1168                        // - The most recent version of the sink, which
1169                        //   is why these commands are run in reverse.
1170                        // - Sinks whose descriptions are not exactly
1171                        //   those that are currently running.
1172                        should_keep = seen_most_recent_definition.insert(export.id)
1173                            && running_sink != Some(&export.description);
1174                    }
1175                }
1176                StorageCommand::RunOneshotIngestion(ingestion) => {
1177                    let already_running = self
1178                        .storage_state
1179                        .oneshot_ingestions
1180                        .contains_key(&ingestion.ingestion_id);
1181                    let was_canceled = cancel_oneshot_ingestions.contains(&ingestion.ingestion_id);
1182
1183                    should_keep = !already_running && !was_canceled;
1184                }
1185                StorageCommand::CancelOneshotIngestion(ingestion_id) => {
1186                    let already_running = self
1187                        .storage_state
1188                        .oneshot_ingestions
1189                        .contains_key(ingestion_id);
1190                    should_keep = already_running;
1191                }
1192                StorageCommand::InitializationComplete
1193                | StorageCommand::AllowWrites
1194                | StorageCommand::UpdateConfiguration(_)
1195                | StorageCommand::AllowCompaction(_, _) => (),
1196            }
1197            if should_keep {
1198                filtered_commands.push_front(command);
1199            }
1200        }
1201        let commands = filtered_commands;
1202
1203        // Make sure all the "drop commands" matched up with a source or sink.
1204        // This is also what the regular handler logic for `AllowCompaction`
1205        // would do.
1206        soft_assert_or_log!(
1207            drop_commands.is_empty(),
1208            "AllowCompaction commands for non-existent IDs {:?}",
1209            drop_commands
1210        );
1211
1212        // Determine the ID of all objects we did _not_ see; these are
1213        // considered stale.
1214        let stale_objects = self
1215            .storage_state
1216            .ingestions
1217            .values()
1218            .map(|i| i.collection_ids())
1219            .flatten()
1220            .chain(self.storage_state.exports.keys().copied())
1221            // Objects are considered stale if we did not see them re-created.
1222            .filter(|id| !expected_objects.contains(id))
1223            .collect::<Vec<_>>();
1224        let stale_oneshot_ingestions = self
1225            .storage_state
1226            .oneshot_ingestions
1227            .keys()
1228            .filter(|ingestion_id| {
1229                let created = create_oneshot_ingestions.contains(ingestion_id);
1230                let dropped = cancel_oneshot_ingestions.contains(ingestion_id);
1231                mz_ore::soft_assert_or_log!(
1232                    !created && dropped,
1233                    "dropped non-existent oneshot source"
1234                );
1235                !created && !dropped
1236            })
1237            .copied()
1238            .collect::<Vec<_>>();
1239
1240        info!(
1241            %worker_id, ?expected_objects, ?stale_objects, ?stale_oneshot_ingestions,
1242            "reconcile: modifing storage state to match expected objects",
1243        );
1244
1245        for id in stale_objects {
1246            self.storage_state.drop_collection(id);
1247        }
1248        for id in stale_oneshot_ingestions {
1249            self.storage_state.drop_oneshot_ingestion(id);
1250        }
1251
1252        // Do not report dropping any objects that do not belong to expected
1253        // objects.
1254        self.storage_state
1255            .dropped_ids
1256            .retain(|id| expected_objects.contains(id));
1257
1258        // Do not report any frontiers that do not belong to expected objects.
1259        // Note that this set of objects can differ from the set of sources and
1260        // sinks.
1261        self.storage_state
1262            .reported_frontiers
1263            .retain(|id, _| expected_objects.contains(id));
1264
1265        // Reset the reported frontiers for the remaining objects.
1266        for (_, frontier) in &mut self.storage_state.reported_frontiers {
1267            *frontier = Antichain::from_elem(<_>::minimum());
1268        }
1269
1270        // Reset the initial status reported flag when a new client connects
1271        self.storage_state.initial_status_reported = false;
1272
1273        // Execute the modified commands.
1274        for command in commands {
1275            self.storage_state.handle_storage_command(command);
1276        }
1277
1278        Ok(())
1279    }
1280}
1281
1282impl StorageState {
1283    /// Entry point for applying a storage command.
1284    ///
1285    /// NOTE: This does not have access to the timely worker and therefore
1286    /// cannot render dataflows. For dataflow rendering, this needs to either
1287    /// send asynchronous command to the `async_worker` or internal
1288    /// commands to the `internal_cmd_tx`.
1289    pub fn handle_storage_command(&mut self, cmd: StorageCommand) {
1290        match cmd {
1291            StorageCommand::CreateTimely { .. } => panic!("CreateTimely must be captured before"),
1292            StorageCommand::InitializationComplete => (),
1293            StorageCommand::AllowWrites => {
1294                self.read_only_tx
1295                    .send(false)
1296                    .expect("we're holding one other end");
1297                self.persist_clients.cfg().enable_compaction();
1298            }
1299            StorageCommand::UpdateConfiguration(params) => {
1300                // These can be done from all workers safely.
1301                tracing::info!("Applying configuration update: {params:?}");
1302
1303                // We serialize the dyncfg updates in StorageParameters, but configure
1304                // persist separately.
1305                self.persist_clients
1306                    .cfg()
1307                    .apply_from(&params.dyncfg_updates);
1308
1309                params.tracing.apply(self.tracing_handle.as_ref());
1310
1311                if let Some(log_filter) = &params.tracing.log_filter {
1312                    self.storage_configuration
1313                        .connection_context
1314                        .librdkafka_log_level =
1315                        mz_ore::tracing::crate_level(&log_filter.clone().into(), "librdkafka");
1316                }
1317
1318                // This needs to be broadcast by one worker and go through
1319                // the internal command fabric, to ensure consistent
1320                // ordering of dataflow rendering across all workers.
1321                if self.timely_worker_index == 0 {
1322                    self.internal_cmd_tx
1323                        .send(InternalStorageCommand::UpdateConfiguration {
1324                            storage_parameters: *params,
1325                        })
1326                }
1327            }
1328            StorageCommand::RunIngestion(ingestion) => {
1329                let RunIngestionCommand { id, description } = *ingestion;
1330
1331                // Remember the ingestion description to facilitate possible
1332                // reconciliation later.
1333                self.ingestions.insert(id, description.clone());
1334
1335                // Initialize shared frontier reporting.
1336                for id in description.collection_ids() {
1337                    self.reported_frontiers
1338                        .entry(id)
1339                        .or_insert_with(|| Antichain::from_elem(mz_repr::Timestamp::minimum()));
1340                }
1341
1342                // This needs to be done by one worker, which will broadcasts a
1343                // `CreateIngestionDataflow` command to all workers based on the response that
1344                // contains the resumption upper.
1345                //
1346                // Doing this separately on each worker could lead to differing resume_uppers
1347                // which might lead to all kinds of mayhem.
1348                //
1349                // n.b. the ingestion on each worker uses the description from worker 0––not the
1350                // ingestion in the local storage state. This is something we might have
1351                // interest in fixing in the future, e.g. materialize#19907
1352                if self.timely_worker_index == 0 {
1353                    self.async_worker.update_frontiers(id, description);
1354                }
1355            }
1356            StorageCommand::RunOneshotIngestion(oneshot) => {
1357                if self.timely_worker_index == 0 {
1358                    self.internal_cmd_tx
1359                        .send(InternalStorageCommand::RunOneshotIngestion {
1360                            ingestion_id: oneshot.ingestion_id,
1361                            collection_id: oneshot.collection_id,
1362                            collection_meta: oneshot.collection_meta,
1363                            request: oneshot.request,
1364                        });
1365                }
1366            }
1367            StorageCommand::CancelOneshotIngestion(id) => {
1368                self.drop_oneshot_ingestion(id);
1369            }
1370            StorageCommand::RunSink(export) => {
1371                // Remember the sink description to facilitate possible
1372                // reconciliation later.
1373                let prev = self.exports.insert(export.id, export.description.clone());
1374
1375                // New sink, add state.
1376                if prev.is_none() {
1377                    self.reported_frontiers.insert(
1378                        export.id,
1379                        Antichain::from_elem(mz_repr::Timestamp::minimum()),
1380                    );
1381                }
1382
1383                // This needs to be broadcast by one worker and go through the internal command
1384                // fabric, to ensure consistent ordering of dataflow rendering across all
1385                // workers.
1386                if self.timely_worker_index == 0 {
1387                    self.internal_cmd_tx
1388                        .send(InternalStorageCommand::RunSinkDataflow(
1389                            export.id,
1390                            export.description,
1391                        ));
1392                }
1393            }
1394            StorageCommand::AllowCompaction(id, frontier) => {
1395                match self.exports.get_mut(&id) {
1396                    Some(export_description) => {
1397                        // Update our knowledge of the `as_of`, in case we need to internally
1398                        // restart a sink in the future.
1399                        export_description.as_of.clone_from(&frontier);
1400                    }
1401                    // reported_frontiers contains both ingestions and their
1402                    // exports
1403                    None if self.reported_frontiers.contains_key(&id) => (),
1404                    None => {
1405                        soft_panic_or_log!("AllowCompaction command for non-existent {id}");
1406                    }
1407                }
1408
1409                if frontier.is_empty() {
1410                    // Indicates that we may drop `id`, as there are no more valid times to read.
1411                    self.drop_collection(id);
1412                }
1413            }
1414        }
1415    }
1416
1417    /// Drop the identified storage collection from the storage state.
1418    fn drop_collection(&mut self, id: GlobalId) {
1419        fail_point!("crash_on_drop");
1420
1421        self.ingestions.remove(&id);
1422        self.exports.remove(&id);
1423
1424        let _ = self.latest_status_updates.remove(&id);
1425
1426        // This will stop reporting of frontiers.
1427        //
1428        // If this object still has its frontiers reported, we will notify the
1429        // client envd of the drop.
1430        if self.reported_frontiers.remove(&id).is_some() {
1431            // The only actions left are internal cleanup, so we can commit to
1432            // the client that these objects have been dropped.
1433            //
1434            // This must be done now rather than in response to `DropDataflow`,
1435            // otherwise we introduce the possibility of a timing issue where:
1436            // - We remove all tracking state from the storage state and send
1437            //   `DropDataflow` (i.e. this block).
1438            // - While waiting to process that command, we reconcile with a new
1439            //   envd. That envd has already committed to its catalog that this
1440            //   object no longer exists.
1441            // - We process the `DropDataflow` command, and identify that this
1442            //   object has been dropped.
1443            // - The next time `dropped_ids` is processed, we send a response
1444            //   that this ID has been dropped, but the upstream state has no
1445            //   record of that object having ever existed.
1446            self.dropped_ids.push(id);
1447        }
1448
1449        // Send through async worker for correct ordering with RunIngestion, and
1450        // dropping the dataflow is done on async worker response.
1451        if self.timely_worker_index == 0 {
1452            self.async_worker.drop_dataflow(id);
1453        }
1454    }
1455
1456    /// Drop the identified oneshot ingestion from the storage state.
1457    fn drop_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) {
1458        let prev = self.oneshot_ingestions.remove(&ingestion_id);
1459        tracing::info!(%ingestion_id, existed = %prev.is_some(), "dropping oneshot ingestion");
1460    }
1461}