mz_storage/
storage_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for storage timely instances.
7//!
8//! One instance of a [`Worker`], along with its contained [`StorageState`], is
9//! part of an ensemble of storage workers that all run inside the same timely
10//! cluster. We call this worker a _storage worker_ to disambiguate it from
11//! other kinds of workers, potentially other components that might be sharing
12//! the same timely cluster.
13//!
14//! ## Controller and internal communication
15//!
16//! A worker receives _external_ [`StorageCommands`](StorageCommand) from the
17//! storage controller, via a channel. Storage workers also share an _internal_
18//! control/command fabric ([`internal_control`]). Internal commands go through
19//! a sequencer dataflow that ensures that all workers receive all commands in
20//! the same consistent order.
21//!
22//! We need to make sure that commands that cause dataflows to be rendered are
23//! processed in the same consistent order across all workers because timely
24//! requires this. To achieve this, we make sure that only internal commands can
25//! cause dataflows to be rendered. External commands (from the controller)
26//! cause internal commands to be broadcast (by only one worker), to get
27//! dataflows rendered.
28//!
29//! The internal command fabric is also used to broadcast messages from a local
30//! operator/worker to all workers. For example, when we need to tear down and
31//! restart a dataflow on all workers when an error is encountered.
32//!
33//! ## Async Storage Worker
34//!
35//! The storage worker has a companion [`AsyncStorageWorker`] that must be used
36//! when running code that requires `async`. This is needed because a timely
37//! main loop cannot run `async` code.
38//!
39//! ## Example flow of commands for `RunIngestion`
40//!
41//! With external commands, internal commands, and the async worker,
42//! understanding where and how commands from the controller are realized can
43//! get complicated. We will follow the complete flow for `RunIngestion`, as an
44//! example:
45//!
46//! 1. Worker receives a [`StorageCommand::RunIngestion`] command from the
47//!    controller.
48//! 2. This command is processed in [`StorageState::handle_storage_command`].
49//!    This step cannot render dataflows, because it does not have access to the
50//!    timely worker. It will only set up state that stays over the whole
51//!    lifetime of the source, such as the `reported_frontier`. Putting in place
52//!    this reported frontier will enable frontier reporting for that source. We
53//!    will not start reporting when we only see an internal command for
54//!    rendering a dataflow, which can "overtake" the external `RunIngestion`
55//!    command.
56//! 3. During processing of that command, we call
57//!    [`AsyncStorageWorker::update_frontiers`], which causes a command to
58//!    be sent to the async worker.
59//! 4. We eventually get a response from the async worker:
60//!    [`AsyncStorageWorkerResponse::FrontiersUpdated`].
61//! 5. This response is handled in [`Worker::handle_async_worker_response`].
62//! 6. Handling that response causes a
63//!    [`InternalStorageCommand::CreateIngestionDataflow`] to be broadcast to
64//!    all workers via the internal command fabric.
65//! 7. This message will be processed (on each worker) in
66//!    [`Worker::handle_internal_storage_command`]. This is what will cause the
67//!    required dataflow to be rendered on all workers.
68//!
69//! The process described above assumes that the `RunIngestion` is _not_ an
70//! update, i.e. it is in response to a `CREATE SOURCE`-like statement.
71//!
72//! The primary distinction when handling a `RunIngestion` that represents an
73//! update, is that it might fill out new internal state in the mid-level
74//! clients on the way toward being run.
75
76use std::cell::RefCell;
77use std::collections::{BTreeMap, BTreeSet, VecDeque};
78use std::path::PathBuf;
79use std::rc::Rc;
80use std::sync::Arc;
81use std::thread;
82use std::time::Duration;
83
84use crossbeam_channel::{RecvError, TryRecvError};
85use fail::fail_point;
86use mz_ore::now::NowFn;
87use mz_ore::tracing::TracingHandle;
88use mz_ore::{soft_assert_or_log, soft_panic_or_log};
89use mz_persist_client::batch::ProtoBatch;
90use mz_persist_client::cache::PersistClientCache;
91use mz_persist_client::operators::shard_source::ErrorHandler;
92use mz_repr::{GlobalId, Timestamp};
93use mz_rocksdb::config::SharedWriteBufferManager;
94use mz_storage_client::client::{
95    RunIngestionCommand, StatusUpdate, StorageCommand, StorageResponse,
96};
97use mz_storage_types::AlterCompatible;
98use mz_storage_types::configuration::StorageConfiguration;
99use mz_storage_types::connections::ConnectionContext;
100use mz_storage_types::controller::CollectionMetadata;
101use mz_storage_types::dyncfgs::STORAGE_SERVER_MAINTENANCE_INTERVAL;
102use mz_storage_types::oneshot_sources::OneshotIngestionDescription;
103use mz_storage_types::sinks::StorageSinkDesc;
104use mz_storage_types::sources::IngestionDescription;
105use mz_timely_util::builder_async::PressOnDropButton;
106use mz_txn_wal::operator::TxnsContext;
107use timely::communication::Allocate;
108use timely::order::PartialOrder;
109use timely::progress::Timestamp as _;
110use timely::progress::frontier::Antichain;
111use timely::worker::Worker as TimelyWorker;
112use tokio::sync::{mpsc, watch};
113use tokio::time::Instant;
114use tracing::{info, warn};
115use uuid::Uuid;
116
117use crate::internal_control::{
118    self, DataflowParameters, InternalCommandReceiver, InternalCommandSender,
119    InternalStorageCommand,
120};
121use crate::metrics::StorageMetrics;
122use crate::statistics::{AggregatedStatistics, SinkStatistics, SourceStatistics};
123use crate::storage_state::async_storage_worker::{AsyncStorageWorker, AsyncStorageWorkerResponse};
124
125pub mod async_storage_worker;
126
127type CommandReceiver = crossbeam_channel::Receiver<StorageCommand>;
128type ResponseSender = mpsc::UnboundedSender<StorageResponse>;
129
130/// State maintained for each worker thread.
131///
132/// Much of this state can be viewed as local variables for the worker thread,
133/// holding state that persists across function calls.
134pub struct Worker<'w, A: Allocate> {
135    /// The underlying Timely worker.
136    ///
137    /// NOTE: This is `pub` for testing.
138    pub timely_worker: &'w mut TimelyWorker<A>,
139    /// The channel over which communication handles for newly connected clients
140    /// are delivered.
141    pub client_rx: crossbeam_channel::Receiver<(Uuid, CommandReceiver, ResponseSender)>,
142    /// The state associated with collection ingress and egress.
143    pub storage_state: StorageState,
144}
145
146impl<'w, A: Allocate> Worker<'w, A> {
147    /// Creates new `Worker` state from the given components.
148    pub fn new(
149        timely_worker: &'w mut TimelyWorker<A>,
150        client_rx: crossbeam_channel::Receiver<(Uuid, CommandReceiver, ResponseSender)>,
151        metrics: StorageMetrics,
152        now: NowFn,
153        connection_context: ConnectionContext,
154        instance_context: StorageInstanceContext,
155        persist_clients: Arc<PersistClientCache>,
156        txns_ctx: TxnsContext,
157        tracing_handle: Arc<TracingHandle>,
158        shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
159    ) -> Self {
160        // It is very important that we only create the internal control
161        // flow/command sequencer once because a) the worker state is re-used
162        // when a new client connects and b) dataflows that have already been
163        // rendered into the timely worker are reused as well.
164        //
165        // If we created a new sequencer every time we get a new client (likely
166        // because the controller re-started and re-connected), dataflows that
167        // were rendered before would still hold a handle to the old sequencer
168        // but we would not read their commands anymore.
169        let (internal_cmd_tx, internal_cmd_rx) =
170            internal_control::setup_command_sequencer(timely_worker);
171
172        let storage_configuration =
173            StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs());
174
175        // We always initialize as read_only=true. Only when we're explicitly
176        // allowed do we switch to doing writes.
177        let (read_only_tx, read_only_rx) = watch::channel(true);
178
179        // Similar to the internal command sequencer, it is very important that
180        // we only create the async worker once because a) the worker state is
181        // re-used when a new client connects and b) commands that have already
182        // been sent and might yield a response will be lost if a new iteration
183        // of `run_client` creates a new async worker.
184        //
185        // If we created a new async worker every time we get a new client
186        // (likely because the controller re-started and re-connected), we can
187        // get into an inconsistent state where we think that a dataflow has
188        // been rendered, for example because there is an entry in
189        // `StorageState::ingestions`, while there is not yet a dataflow. This
190        // happens because the dataflow only gets rendered once we get a
191        // response from the async worker and send off an internal command.
192        //
193        // The core idea is that both the sequencer and the async worker are
194        // part of the per-worker state, and must be treated as such, meaning
195        // they must survive between invocations of `run_client`.
196
197        // TODO(aljoscha): This thread unparking business seems brittle, but that's
198        // also how the command channel works currently. We can wrap it inside a
199        // struct that holds both a channel and a `Thread`, but I don't
200        // think that would help too much.
201        let async_worker = async_storage_worker::AsyncStorageWorker::new(
202            thread::current(),
203            Arc::clone(&persist_clients),
204        );
205        let cluster_memory_limit = instance_context.cluster_memory_limit;
206
207        let storage_state = StorageState {
208            source_uppers: BTreeMap::new(),
209            source_tokens: BTreeMap::new(),
210            metrics,
211            reported_frontiers: BTreeMap::new(),
212            ingestions: BTreeMap::new(),
213            exports: BTreeMap::new(),
214            oneshot_ingestions: BTreeMap::new(),
215            now,
216            timely_worker_index: timely_worker.index(),
217            timely_worker_peers: timely_worker.peers(),
218            instance_context,
219            persist_clients,
220            txns_ctx,
221            sink_tokens: BTreeMap::new(),
222            sink_write_frontiers: BTreeMap::new(),
223            dropped_ids: Vec::new(),
224            aggregated_statistics: AggregatedStatistics::new(
225                timely_worker.index(),
226                timely_worker.peers(),
227            ),
228            shared_status_updates: Default::default(),
229            latest_status_updates: Default::default(),
230            initial_status_reported: Default::default(),
231            internal_cmd_tx,
232            internal_cmd_rx,
233            read_only_tx,
234            read_only_rx,
235            async_worker,
236            storage_configuration,
237            dataflow_parameters: DataflowParameters::new(
238                shared_rocksdb_write_buffer_manager,
239                cluster_memory_limit,
240            ),
241            tracing_handle,
242            server_maintenance_interval: Duration::ZERO,
243        };
244
245        // TODO(aljoscha): We might want `async_worker` and `internal_cmd_tx` to
246        // be fields of `Worker` instead of `StorageState`, but at least for the
247        // command flow sources and sinks need access to that. We can refactor
248        // this once we have a clearer boundary between what sources/sinks need
249        // and the full "power" of the internal command flow, which should stay
250        // internal to the worker/not be exposed to source/sink implementations.
251        Self {
252            timely_worker,
253            client_rx,
254            storage_state,
255        }
256    }
257}
258
259/// Worker-local state related to the ingress or egress of collections of data.
260pub struct StorageState {
261    /// The highest observed upper frontier for collection.
262    ///
263    /// This is shared among all source instances, so that they can jointly advance the
264    /// frontier even as other instances are created and dropped. Ideally, the Storage
265    /// module would eventually provide one source of truth on this rather than multiple,
266    /// and we should aim for that but are not there yet.
267    pub source_uppers: BTreeMap<GlobalId, Rc<RefCell<Antichain<mz_repr::Timestamp>>>>,
268    /// Handles to created sources, keyed by ID
269    /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
270    /// to prevent usage of custom shutdown tokens that are tricky to get right.
271    pub source_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
272    /// Metrics for storage objects.
273    pub metrics: StorageMetrics,
274    /// Tracks the conditional write frontiers we have reported.
275    pub reported_frontiers: BTreeMap<GlobalId, Antichain<Timestamp>>,
276    /// Descriptions of each installed ingestion.
277    pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
278    /// Descriptions of each installed export.
279    pub exports: BTreeMap<GlobalId, StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>,
280    /// Descriptions of oneshot ingestions that are currently running.
281    pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
282    /// Undocumented
283    pub now: NowFn,
284    /// Index of the associated timely dataflow worker.
285    pub timely_worker_index: usize,
286    /// Peers in the associated timely dataflow worker.
287    pub timely_worker_peers: usize,
288    /// Other configuration for sources and sinks.
289    pub instance_context: StorageInstanceContext,
290    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
291    /// This is intentionally shared between workers
292    pub persist_clients: Arc<PersistClientCache>,
293    /// Context necessary for rendering txn-wal operators.
294    pub txns_ctx: TxnsContext,
295    /// Tokens that should be dropped when a dataflow is dropped to clean up
296    /// associated state.
297    /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
298    /// to prevent usage of custom shutdown tokens that are tricky to get right.
299    pub sink_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
300    /// Frontier of sink writes (all subsequent writes will be at times at or
301    /// equal to this frontier)
302    pub sink_write_frontiers: BTreeMap<GlobalId, Rc<RefCell<Antichain<Timestamp>>>>,
303    /// Collection ids that have been dropped but not yet reported as dropped
304    pub dropped_ids: Vec<GlobalId>,
305
306    /// Statistics for sources and sinks.
307    pub aggregated_statistics: AggregatedStatistics,
308
309    /// A place shared with running dataflows, so that health operators, can
310    /// report status updates back to us.
311    ///
312    /// **NOTE**: Operators that append to this collection should take care to only add new
313    /// status updates if the status of the ingestion/export in question has _changed_.
314    pub shared_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,
315
316    /// The latest status update for each object.
317    pub latest_status_updates: BTreeMap<GlobalId, StatusUpdate>,
318
319    /// Whether we have reported the initial status after connecting to a new client.
320    /// This is reset to false when a new client connects.
321    pub initial_status_reported: bool,
322
323    /// Sender for cluster-internal storage commands. These can be sent from
324    /// within workers/operators and will be distributed to all workers. For
325    /// example, for shutting down an entire dataflow from within a
326    /// operator/worker.
327    pub internal_cmd_tx: InternalCommandSender,
328    /// Receiver for cluster-internal storage commands.
329    pub internal_cmd_rx: InternalCommandReceiver,
330
331    /// When this replica/cluster is in read-only mode it must not affect any
332    /// changes to external state. This flag can only be changed by a
333    /// [StorageCommand::AllowWrites].
334    ///
335    /// Everything running on this replica/cluster must obey this flag. At the
336    /// time of writing, nothing currently looks at this flag.
337    /// TODO(benesch): fix this.
338    ///
339    /// NOTE: In the future, we might want a more complicated flag, for example
340    /// something that tells us after which timestamp we are allowed to write.
341    /// In this first version we are keeping things as simple as possible!
342    pub read_only_rx: watch::Receiver<bool>,
343
344    /// Send-side for read-only state.
345    pub read_only_tx: watch::Sender<bool>,
346
347    /// Async worker companion, used for running code that requires async, which
348    /// the timely main loop cannot do.
349    pub async_worker: AsyncStorageWorker<mz_repr::Timestamp>,
350
351    /// Configuration for source and sink connections.
352    pub storage_configuration: StorageConfiguration,
353    /// Dynamically configurable parameters that control how dataflows are rendered.
354    /// NOTE(guswynn): we should consider moving these into `storage_configuration`.
355    pub dataflow_parameters: DataflowParameters,
356
357    /// A process-global handle to tracing configuration.
358    pub tracing_handle: Arc<TracingHandle>,
359
360    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
361    /// perform maintenance with every `step_or_park` invocation.
362    pub server_maintenance_interval: Duration,
363}
364
365impl StorageState {
366    /// Return an error handler that triggers a suspend and restart of the corresponding storage
367    /// dataflow.
368    pub fn error_handler(&self, context: &'static str, id: GlobalId) -> ErrorHandler {
369        let tx = self.internal_cmd_tx.clone();
370        ErrorHandler::signal(move |e| {
371            tx.send(InternalStorageCommand::SuspendAndRestart {
372                id,
373                reason: format!("{context}: {e:#}"),
374            })
375        })
376    }
377}
378
379/// Extra context for a storage instance.
380/// This is extra information that is used when rendering source
381/// and sinks that is not tied to the source/connection configuration itself.
382#[derive(Clone)]
383pub struct StorageInstanceContext {
384    /// A directory that can be used for scratch work.
385    pub scratch_directory: Option<PathBuf>,
386    /// A global `rocksdb::Env`, shared across ALL instances of `RocksDB` (even
387    /// across sources!). This `Env` lets us control some resources (like background threads)
388    /// process-wide.
389    pub rocksdb_env: rocksdb::Env,
390    /// The memory limit of the materialize cluster replica. This will
391    /// be used to calculate and configure the maximum inflight bytes for backpressure
392    pub cluster_memory_limit: Option<usize>,
393}
394
395impl StorageInstanceContext {
396    /// Build a new `StorageInstanceContext`.
397    pub fn new(
398        scratch_directory: Option<PathBuf>,
399        cluster_memory_limit: Option<usize>,
400    ) -> Result<Self, anyhow::Error> {
401        // If no file system is available, fall back to running RocksDB in memory.
402        let rocksdb_env = if scratch_directory.is_some() {
403            rocksdb::Env::new()?
404        } else {
405            rocksdb::Env::mem_env()?
406        };
407
408        Ok(Self {
409            scratch_directory,
410            rocksdb_env,
411            cluster_memory_limit,
412        })
413    }
414}
415
416impl<'w, A: Allocate> Worker<'w, A> {
417    /// Waits for client connections and runs them to completion.
418    pub fn run(&mut self) {
419        while let Ok((_nonce, rx, tx)) = self.client_rx.recv() {
420            self.run_client(rx, tx);
421        }
422    }
423
424    /// Runs this (timely) storage worker until the given `command_rx` is
425    /// disconnected.
426    ///
427    /// See the [module documentation](crate::storage_state) for this
428    /// workers responsibilities, how it communicates with the other workers and
429    /// how commands flow from the controller and through the workers.
430    fn run_client(&mut self, command_rx: CommandReceiver, response_tx: ResponseSender) {
431        // At this point, all workers are still reading from the command flow.
432        if self.reconcile(&command_rx).is_err() {
433            return;
434        }
435
436        // The last time we reported statistics.
437        let mut last_stats_time = Instant::now();
438
439        // The last time we did periodic maintenance.
440        let mut last_maintenance = std::time::Instant::now();
441
442        let mut disconnected = false;
443        while !disconnected {
444            let config = &self.storage_state.storage_configuration;
445            let stats_interval = config.parameters.statistics_collection_interval;
446
447            let maintenance_interval = self.storage_state.server_maintenance_interval;
448
449            let now = std::time::Instant::now();
450            // Determine if we need to perform maintenance, which is true if `maintenance_interval`
451            // time has passed since the last maintenance.
452            let sleep_duration;
453            if now >= last_maintenance + maintenance_interval {
454                last_maintenance = now;
455                sleep_duration = None;
456
457                self.report_frontier_progress(&response_tx);
458            } else {
459                // We didn't perform maintenance, sleep until the next maintenance interval.
460                let next_maintenance = last_maintenance + maintenance_interval;
461                sleep_duration = Some(next_maintenance.saturating_duration_since(now))
462            }
463
464            // Ask Timely to execute a unit of work.
465            //
466            // If there are no pending commands or responses from the async
467            // worker, we ask Timely to park the thread if there's nothing to
468            // do. We rely on another thread unparking us when there's new work
469            // to be done, e.g., when sending a command or when new Kafka
470            // messages have arrived.
471            //
472            // It is critical that we allow Timely to park iff there are no
473            // pending commands or responses. The command may have already been
474            // consumed by the call to `client_rx.recv`. See:
475            // https://github.com/MaterializeInc/materialize/pull/13973#issuecomment-1200312212
476            if command_rx.is_empty() && self.storage_state.async_worker.is_empty() {
477                // Make sure we wake up again to report any pending statistics updates.
478                let mut park_duration = stats_interval.saturating_sub(last_stats_time.elapsed());
479                if let Some(sleep_duration) = sleep_duration {
480                    park_duration = std::cmp::min(sleep_duration, park_duration);
481                }
482                self.timely_worker.step_or_park(Some(park_duration));
483            } else {
484                self.timely_worker.step();
485            }
486
487            // Rerport any dropped ids
488            for id in std::mem::take(&mut self.storage_state.dropped_ids) {
489                self.send_storage_response(&response_tx, StorageResponse::DroppedId(id));
490            }
491
492            self.process_oneshot_ingestions(&response_tx);
493
494            self.report_status_updates(&response_tx);
495
496            if last_stats_time.elapsed() >= stats_interval {
497                self.report_storage_statistics(&response_tx);
498                last_stats_time = Instant::now();
499            }
500
501            // Handle any received commands.
502            loop {
503                match command_rx.try_recv() {
504                    Ok(cmd) => self.storage_state.handle_storage_command(cmd),
505                    Err(TryRecvError::Empty) => break,
506                    Err(TryRecvError::Disconnected) => {
507                        disconnected = true;
508                        break;
509                    }
510                }
511            }
512
513            // Handle responses from the async worker.
514            while let Ok(response) = self.storage_state.async_worker.try_recv() {
515                self.handle_async_worker_response(response);
516            }
517
518            // Handle any received commands.
519            while let Some(command) = self.storage_state.internal_cmd_rx.try_recv() {
520                self.handle_internal_storage_command(command);
521            }
522        }
523    }
524
525    /// Entry point for applying a response from the async storage worker.
526    pub fn handle_async_worker_response(
527        &self,
528        async_response: AsyncStorageWorkerResponse<mz_repr::Timestamp>,
529    ) {
530        // NOTE: If we want to share the load of async processing we
531        // have to change `handle_storage_command` and change this
532        // assert.
533        assert_eq!(
534            self.timely_worker.index(),
535            0,
536            "only worker #0 is doing async processing"
537        );
538        match async_response {
539            AsyncStorageWorkerResponse::FrontiersUpdated {
540                id,
541                ingestion_description,
542                as_of,
543                resume_uppers,
544                source_resume_uppers,
545            } => {
546                self.storage_state.internal_cmd_tx.send(
547                    InternalStorageCommand::CreateIngestionDataflow {
548                        id,
549                        ingestion_description,
550                        as_of,
551                        resume_uppers,
552                        source_resume_uppers,
553                    },
554                );
555            }
556            AsyncStorageWorkerResponse::DropDataflow(id) => {
557                self.storage_state
558                    .internal_cmd_tx
559                    .send(InternalStorageCommand::DropDataflow(vec![id]));
560            }
561        }
562    }
563
564    /// Entry point for applying an internal storage command.
565    pub fn handle_internal_storage_command(&mut self, internal_cmd: InternalStorageCommand) {
566        match internal_cmd {
567            InternalStorageCommand::SuspendAndRestart { id, reason } => {
568                info!(
569                    "worker {}/{} initiating suspend-and-restart for {id} because of: {reason}",
570                    self.timely_worker.index(),
571                    self.timely_worker.peers(),
572                );
573
574                let maybe_ingestion = self.storage_state.ingestions.get(&id).cloned();
575                if let Some(ingestion_description) = maybe_ingestion {
576                    // Yank the token of the previously existing source dataflow.Note that this
577                    // token also includes any source exports/subsources.
578                    let maybe_token = self.storage_state.source_tokens.remove(&id);
579                    if maybe_token.is_none() {
580                        // Something has dropped the source. Make sure we don't
581                        // accidentally re-create it.
582                        return;
583                    }
584
585                    // This needs to be done by one worker, which will
586                    // broadcasts a `CreateIngestionDataflow` command to all
587                    // workers based on the response that contains the
588                    // resumption upper.
589                    //
590                    // Doing this separately on each worker could lead to
591                    // differing resume_uppers which might lead to all kinds of
592                    // mayhem.
593                    //
594                    // TODO(aljoscha): If we ever become worried that this is
595                    // putting undue pressure on worker 0 we can pick the
596                    // designated worker for a source/sink based on `id.hash()`.
597                    if self.timely_worker.index() == 0 {
598                        for (id, _) in ingestion_description.source_exports.iter() {
599                            self.storage_state
600                                .aggregated_statistics
601                                .advance_global_epoch(*id);
602                        }
603                        self.storage_state
604                            .async_worker
605                            .update_frontiers(id, ingestion_description);
606                    }
607
608                    // Continue with other commands.
609                    return;
610                }
611
612                let maybe_sink = self.storage_state.exports.get(&id).cloned();
613                if let Some(sink_description) = maybe_sink {
614                    // Yank the token of the previously existing sink
615                    // dataflow.
616                    let maybe_token = self.storage_state.sink_tokens.remove(&id);
617
618                    if maybe_token.is_none() {
619                        // Something has dropped the sink. Make sure we don't
620                        // accidentally re-create it.
621                        return;
622                    }
623
624                    // This needs to be broadcast by one worker and go through
625                    // the internal command fabric, to ensure consistent
626                    // ordering of dataflow rendering across all workers.
627                    if self.timely_worker.index() == 0 {
628                        self.storage_state
629                            .aggregated_statistics
630                            .advance_global_epoch(id);
631                        self.storage_state.internal_cmd_tx.send(
632                            InternalStorageCommand::RunSinkDataflow(id, sink_description),
633                        );
634                    }
635
636                    // Continue with other commands.
637                    return;
638                }
639
640                if !self
641                    .storage_state
642                    .ingestions
643                    .values()
644                    .any(|v| v.source_exports.contains_key(&id))
645                {
646                    // Our current approach to dropping a source results in a race between shard
647                    // finalization (which happens in the controller) and dataflow shutdown (which
648                    // happens in clusterd). If a source is created and dropped fast enough -or the
649                    // two commands get sufficiently delayed- then it's possible to receive a
650                    // SuspendAndRestart command for an unknown source. We cannot assert that this
651                    // never happens but we log an error here to track how often this happens.
652                    warn!(
653                        "got InternalStorageCommand::SuspendAndRestart for something that is not a source or sink: {id}"
654                    );
655                }
656            }
657            InternalStorageCommand::CreateIngestionDataflow {
658                id: ingestion_id,
659                mut ingestion_description,
660                as_of,
661                mut resume_uppers,
662                mut source_resume_uppers,
663            } => {
664                info!(
665                    ?as_of,
666                    ?resume_uppers,
667                    "worker {}/{} trying to (re-)start ingestion {ingestion_id}",
668                    self.timely_worker.index(),
669                    self.timely_worker.peers(),
670                );
671
672                // We initialize statistics before we prune finished exports. We
673                // still want to export statistics for these, plus the rendering
674                // machinery will get confused if there are not at least
675                // statistics for the "main" source.
676                for (export_id, export) in ingestion_description.source_exports.iter() {
677                    let resume_upper = resume_uppers[export_id].clone();
678                    self.storage_state.aggregated_statistics.initialize_source(
679                        *export_id,
680                        resume_upper.clone(),
681                        || {
682                            SourceStatistics::new(
683                                *export_id,
684                                self.storage_state.timely_worker_index,
685                                &self.storage_state.metrics.source_statistics,
686                                ingestion_id,
687                                &export.storage_metadata.data_shard,
688                                export.data_config.envelope.clone(),
689                                resume_upper,
690                            )
691                        },
692                    );
693                }
694
695                let finished_exports: BTreeSet<GlobalId> = resume_uppers
696                    .iter()
697                    .filter(|(_, frontier)| frontier.is_empty())
698                    .map(|(id, _)| *id)
699                    .collect();
700
701                resume_uppers.retain(|id, _| !finished_exports.contains(id));
702                source_resume_uppers.retain(|id, _| !finished_exports.contains(id));
703                ingestion_description
704                    .source_exports
705                    .retain(|id, _| !finished_exports.contains(id));
706
707                for id in ingestion_description.collection_ids() {
708                    // If there is already a shared upper, we re-use it, to make
709                    // sure that parties that are already using the shared upper
710                    // can continue doing so.
711                    let source_upper = self
712                        .storage_state
713                        .source_uppers
714                        .entry(id.clone())
715                        .or_insert_with(|| {
716                            Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())))
717                        });
718
719                    let mut source_upper = source_upper.borrow_mut();
720                    if !source_upper.is_empty() {
721                        source_upper.clear();
722                        source_upper.insert(mz_repr::Timestamp::minimum());
723                    }
724                }
725
726                // If all subsources of the source are finished, we can skip rendering entirely.
727                // Also, if `as_of` is empty, the dataflow has been finalized, so we can skip it as
728                // well.
729                //
730                // TODO(guswynn|petrosagg): this is a bit hacky, and is a consequence of storage state
731                // management being a bit of a mess. we should clean this up and remove weird if
732                // statements like this.
733                if resume_uppers.values().all(|frontier| frontier.is_empty()) || as_of.is_empty() {
734                    tracing::info!(
735                        ?resume_uppers,
736                        ?as_of,
737                        "worker {}/{} skipping building ingestion dataflow \
738                        for {ingestion_id} because the ingestion is finished",
739                        self.timely_worker.index(),
740                        self.timely_worker.peers(),
741                    );
742                    return;
743                }
744
745                crate::render::build_ingestion_dataflow(
746                    self.timely_worker,
747                    &mut self.storage_state,
748                    ingestion_id,
749                    ingestion_description,
750                    as_of,
751                    resume_uppers,
752                    source_resume_uppers,
753                );
754            }
755            InternalStorageCommand::RunOneshotIngestion {
756                ingestion_id,
757                collection_id,
758                collection_meta,
759                request,
760            } => {
761                crate::render::build_oneshot_ingestion_dataflow(
762                    self.timely_worker,
763                    &mut self.storage_state,
764                    ingestion_id,
765                    collection_id,
766                    collection_meta,
767                    request,
768                );
769            }
770            InternalStorageCommand::RunSinkDataflow(sink_id, sink_description) => {
771                info!(
772                    "worker {}/{} trying to (re-)start sink {sink_id}",
773                    self.timely_worker.index(),
774                    self.timely_worker.peers(),
775                );
776
777                {
778                    // If there is already a shared write frontier, we re-use it, to
779                    // make sure that parties that are already using the shared
780                    // frontier can continue doing so.
781                    let sink_write_frontier = self
782                        .storage_state
783                        .sink_write_frontiers
784                        .entry(sink_id.clone())
785                        .or_insert_with(|| Rc::new(RefCell::new(Antichain::new())));
786
787                    let mut sink_write_frontier = sink_write_frontier.borrow_mut();
788                    sink_write_frontier.clear();
789                    sink_write_frontier.insert(mz_repr::Timestamp::minimum());
790                }
791                self.storage_state
792                    .aggregated_statistics
793                    .initialize_sink(sink_id, || {
794                        SinkStatistics::new(
795                            sink_id,
796                            self.storage_state.timely_worker_index,
797                            &self.storage_state.metrics.sink_statistics,
798                        )
799                    });
800
801                crate::render::build_export_dataflow(
802                    self.timely_worker,
803                    &mut self.storage_state,
804                    sink_id,
805                    sink_description,
806                );
807            }
808            InternalStorageCommand::DropDataflow(ids) => {
809                for id in &ids {
810                    // Clean up per-source / per-sink state.
811                    self.storage_state.source_uppers.remove(id);
812                    self.storage_state.source_tokens.remove(id);
813
814                    self.storage_state.sink_tokens.remove(id);
815
816                    self.storage_state.aggregated_statistics.deinitialize(*id);
817                }
818            }
819            InternalStorageCommand::UpdateConfiguration { storage_parameters } => {
820                self.storage_state
821                    .dataflow_parameters
822                    .update(storage_parameters.clone());
823                self.storage_state
824                    .storage_configuration
825                    .update(storage_parameters);
826
827                // Clear out the updates as we no longer forward them to anyone else to process.
828                // We clone `StorageState::storage_configuration` many times during rendering
829                // and want to avoid cloning these unused updates.
830                self.storage_state
831                    .storage_configuration
832                    .parameters
833                    .dyncfg_updates = Default::default();
834
835                // Remember the maintenance interval locally to avoid reading it from the config set on
836                // every server iteration.
837                self.storage_state.server_maintenance_interval =
838                    STORAGE_SERVER_MAINTENANCE_INTERVAL
839                        .get(self.storage_state.storage_configuration.config_set());
840            }
841            InternalStorageCommand::StatisticsUpdate { sources, sinks } => self
842                .storage_state
843                .aggregated_statistics
844                .ingest(sources, sinks),
845        }
846    }
847
848    /// Emit information about write frontier progress, along with information that should
849    /// be made durable for this to be the case.
850    ///
851    /// The write frontier progress is "conditional" in that it is not until the information is made
852    /// durable that the data are emitted to downstream workers, and indeed they should not rely on
853    /// the completeness of what they hear until the information is made durable.
854    ///
855    /// Specifically, this sends information about new timestamp bindings created by dataflow workers,
856    /// with the understanding if that if made durable (and ack'd back to the workers) the source will
857    /// in fact progress with this write frontier.
858    pub fn report_frontier_progress(&mut self, response_tx: &ResponseSender) {
859        let mut new_uppers = Vec::new();
860
861        // Check if any observed frontier should advance the reported frontiers.
862        for (id, frontier) in self
863            .storage_state
864            .source_uppers
865            .iter()
866            .chain(self.storage_state.sink_write_frontiers.iter())
867        {
868            let Some(reported_frontier) = self.storage_state.reported_frontiers.get_mut(id) else {
869                // Frontier reporting has not yet been started for this object.
870                // Potentially because this timely worker has not yet seen the
871                // `CreateSources` command.
872                continue;
873            };
874
875            let observed_frontier = frontier.borrow();
876
877            // Only do a thing if it *advances* the frontier, not just *changes* the frontier.
878            // This is protection against `frontier` lagging behind what we have conditionally reported.
879            if PartialOrder::less_than(reported_frontier, &observed_frontier) {
880                new_uppers.push((*id, observed_frontier.clone()));
881                reported_frontier.clone_from(&observed_frontier);
882            }
883        }
884
885        for (id, upper) in new_uppers {
886            self.send_storage_response(response_tx, StorageResponse::FrontierUpper(id, upper));
887        }
888    }
889
890    /// Pumps latest status updates from the buffer shared with operators and
891    /// reports any updates that need reporting.
892    pub fn report_status_updates(&mut self, response_tx: &ResponseSender) {
893        // If we haven't done the initial status report, report all current statuses
894        if !self.storage_state.initial_status_reported {
895            // We pull initially reported status updates to "now", so that they
896            // sort as the latest update in internal status collections. This
897            // makes it so that a newly bootstrapped envd can append status
898            // updates to internal status collections that report an accurate
899            // view as of the time when they came up.
900            let now_ts = mz_ore::now::to_datetime((self.storage_state.now)());
901            let status_updates = self
902                .storage_state
903                .latest_status_updates
904                .values()
905                .cloned()
906                .map(|mut update| {
907                    update.timestamp = now_ts.clone();
908                    update
909                });
910            for update in status_updates {
911                self.send_storage_response(response_tx, StorageResponse::StatusUpdate(update));
912            }
913            self.storage_state.initial_status_reported = true;
914        }
915
916        // Pump updates into our state and stage them for reporting.
917        for shared_update in self.storage_state.shared_status_updates.take() {
918            self.send_storage_response(
919                response_tx,
920                StorageResponse::StatusUpdate(shared_update.clone()),
921            );
922
923            self.storage_state
924                .latest_status_updates
925                .insert(shared_update.id, shared_update);
926        }
927    }
928
929    /// Report source statistics back to the controller.
930    pub fn report_storage_statistics(&mut self, response_tx: &ResponseSender) {
931        let (sources, sinks) = self.storage_state.aggregated_statistics.emit_local();
932        if !sources.is_empty() || !sinks.is_empty() {
933            self.storage_state
934                .internal_cmd_tx
935                .send(InternalStorageCommand::StatisticsUpdate { sources, sinks })
936        }
937
938        let (sources, sinks) = self.storage_state.aggregated_statistics.snapshot();
939        if !sources.is_empty() || !sinks.is_empty() {
940            self.send_storage_response(
941                response_tx,
942                StorageResponse::StatisticsUpdates(sources, sinks),
943            );
944        }
945    }
946
947    /// Send a response to the coordinator.
948    fn send_storage_response(&self, response_tx: &ResponseSender, response: StorageResponse) {
949        // Ignore send errors because the coordinator is free to ignore our
950        // responses. This happens during shutdown.
951        let _ = response_tx.send(response);
952    }
953
954    fn process_oneshot_ingestions(&mut self, response_tx: &ResponseSender) {
955        use tokio::sync::mpsc::error::TryRecvError;
956
957        let mut to_remove = vec![];
958
959        for (ingestion_id, ingestion_state) in &mut self.storage_state.oneshot_ingestions {
960            loop {
961                match ingestion_state.results.try_recv() {
962                    Ok(result) => {
963                        let response = match result {
964                            Ok(maybe_batch) => maybe_batch.into_iter().map(Result::Ok).collect(),
965                            Err(err) => vec![Err(err)],
966                        };
967                        let staged_batches = BTreeMap::from([(*ingestion_id, response)]);
968                        let _ = response_tx.send(StorageResponse::StagedBatches(staged_batches));
969                    }
970                    Err(TryRecvError::Empty) => {
971                        break;
972                    }
973                    Err(TryRecvError::Disconnected) => {
974                        to_remove.push(*ingestion_id);
975                        break;
976                    }
977                }
978            }
979        }
980
981        for ingestion_id in to_remove {
982            tracing::info!(?ingestion_id, "removing oneshot ingestion");
983            self.storage_state.oneshot_ingestions.remove(&ingestion_id);
984        }
985    }
986
987    /// Extract commands until `InitializationComplete`, and make the worker
988    /// reflect those commands. If the worker can not be made to reflect the
989    /// commands, return an error.
990    fn reconcile(&mut self, command_rx: &CommandReceiver) -> Result<(), RecvError> {
991        let worker_id = self.timely_worker.index();
992
993        // To initialize the connection, we want to drain all commands until we
994        // receive a `StorageCommand::InitializationComplete` command to form a
995        // target command state.
996        let mut commands = vec![];
997        loop {
998            match command_rx.recv()? {
999                StorageCommand::InitializationComplete => break,
1000                command => commands.push(command),
1001            }
1002        }
1003
1004        // Track which frontiers this envd expects; we will also set their
1005        // initial timestamp to the minimum timestamp to reset them as we don't
1006        // know what frontiers the new envd expects.
1007        let mut expected_objects = BTreeSet::new();
1008
1009        let mut drop_commands = BTreeSet::new();
1010        let mut running_ingestion_descriptions = self.storage_state.ingestions.clone();
1011        let mut running_exports_descriptions = self.storage_state.exports.clone();
1012
1013        let mut create_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1014        let mut cancel_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1015
1016        for command in &mut commands {
1017            match command {
1018                StorageCommand::Hello { .. } => {
1019                    panic!("Hello must be captured before")
1020                }
1021                StorageCommand::AllowCompaction(id, since) => {
1022                    info!(%worker_id, ?id, ?since, "reconcile: received AllowCompaction command");
1023
1024                    // collect all "drop commands". These are `AllowCompaction`
1025                    // commands that compact to the empty since. Then, later, we make sure
1026                    // we retain only those `Create*` commands that are not dropped. We
1027                    // assume that the `AllowCompaction` command is ordered after the
1028                    // `Create*` commands but don't assert that.
1029                    // WIP: Should we assert?
1030                    if since.is_empty() {
1031                        drop_commands.insert(*id);
1032                    }
1033                }
1034                StorageCommand::RunIngestion(ingestion) => {
1035                    info!(%worker_id, ?ingestion, "reconcile: received RunIngestion command");
1036
1037                    // Ensure that ingestions are forward-rolling alter compatible.
1038                    let prev = running_ingestion_descriptions
1039                        .insert(ingestion.id, ingestion.description.clone());
1040
1041                    if let Some(prev_ingest) = prev {
1042                        // If the new ingestion is not exactly equal to the currently running
1043                        // ingestion, we must either track that we need to synthesize an update
1044                        // command to change the ingestion, or panic.
1045                        prev_ingest
1046                            .alter_compatible(ingestion.id, &ingestion.description)
1047                            .expect("only alter compatible ingestions permitted");
1048                    }
1049                }
1050                StorageCommand::RunSink(export) => {
1051                    info!(%worker_id, ?export, "reconcile: received RunSink command");
1052
1053                    // Ensure that exports are forward-rolling alter compatible.
1054                    let prev =
1055                        running_exports_descriptions.insert(export.id, export.description.clone());
1056
1057                    if let Some(prev_export) = prev {
1058                        prev_export
1059                            .alter_compatible(export.id, &export.description)
1060                            .expect("only alter compatible exports permitted");
1061                    }
1062                }
1063                StorageCommand::RunOneshotIngestion(ingestion) => {
1064                    info!(%worker_id, ?ingestion, "reconcile: received RunOneshotIngestion command");
1065                    create_oneshot_ingestions.insert(ingestion.ingestion_id);
1066                }
1067                StorageCommand::CancelOneshotIngestion(uuid) => {
1068                    info!(%worker_id, %uuid, "reconcile: received CancelOneshotIngestion command");
1069                    cancel_oneshot_ingestions.insert(*uuid);
1070                }
1071                StorageCommand::InitializationComplete
1072                | StorageCommand::AllowWrites
1073                | StorageCommand::UpdateConfiguration(_) => (),
1074            }
1075        }
1076
1077        let mut seen_most_recent_definition = BTreeSet::new();
1078
1079        // We iterate over this backward to ensure that we keep only the most recent ingestion
1080        // description.
1081        let mut filtered_commands = VecDeque::new();
1082        for mut command in commands.into_iter().rev() {
1083            let mut should_keep = true;
1084            match &mut command {
1085                StorageCommand::Hello { .. } => {
1086                    panic!("Hello must be captured before")
1087                }
1088                StorageCommand::RunIngestion(ingestion) => {
1089                    // Subsources can be dropped independently of their
1090                    // primary source, so we evaluate them in a separate
1091                    // loop.
1092                    for export_id in ingestion
1093                        .description
1094                        .source_exports
1095                        .keys()
1096                        .filter(|export_id| **export_id != ingestion.id)
1097                    {
1098                        if drop_commands.remove(export_id) {
1099                            info!(%worker_id, %export_id, "reconcile: dropping subsource");
1100                            self.storage_state.dropped_ids.push(*export_id);
1101                        }
1102                    }
1103
1104                    if drop_commands.remove(&ingestion.id)
1105                        || self.storage_state.dropped_ids.contains(&ingestion.id)
1106                    {
1107                        info!(%worker_id, %ingestion.id, "reconcile: dropping ingestion");
1108
1109                        // If an ingestion is dropped, so too must all of
1110                        // its subsources (i.e. ingestion exports, as well
1111                        // as its progress subsource).
1112                        for id in ingestion.description.collection_ids() {
1113                            drop_commands.remove(&id);
1114                            self.storage_state.dropped_ids.push(id);
1115                        }
1116                        should_keep = false;
1117                    } else {
1118                        let most_recent_defintion =
1119                            seen_most_recent_definition.insert(ingestion.id);
1120
1121                        if most_recent_defintion {
1122                            // If this is the most recent definition, this
1123                            // is what we will be running when
1124                            // reconciliation completes. This definition
1125                            // must not include any dropped subsources.
1126                            ingestion.description.source_exports.retain(|export_id, _| {
1127                                !self.storage_state.dropped_ids.contains(export_id)
1128                            });
1129
1130                            // After clearing any dropped subsources, we can
1131                            // state that we expect all of these to exist.
1132                            expected_objects.extend(ingestion.description.collection_ids());
1133                        }
1134
1135                        let running_ingestion = self.storage_state.ingestions.get(&ingestion.id);
1136
1137                        // We keep only:
1138                        // - The most recent version of the ingestion, which
1139                        //   is why these commands are run in reverse.
1140                        // - Ingestions whose descriptions are not exactly
1141                        //   those that are currently running.
1142                        should_keep = most_recent_defintion
1143                            && running_ingestion != Some(&ingestion.description)
1144                    }
1145                }
1146                StorageCommand::RunSink(export) => {
1147                    if drop_commands.remove(&export.id)
1148                        // If there were multiple `RunSink` in the command
1149                        // stream, we want to ensure none of them are
1150                        // retained.
1151                        || self.storage_state.dropped_ids.contains(&export.id)
1152                    {
1153                        info!(%worker_id, %export.id, "reconcile: dropping sink");
1154
1155                        // Make sure that we report back that the ID was
1156                        // dropped.
1157                        self.storage_state.dropped_ids.push(export.id);
1158
1159                        should_keep = false
1160                    } else {
1161                        expected_objects.insert(export.id);
1162
1163                        let running_sink = self.storage_state.exports.get(&export.id);
1164
1165                        // We keep only:
1166                        // - The most recent version of the sink, which
1167                        //   is why these commands are run in reverse.
1168                        // - Sinks whose descriptions are not exactly
1169                        //   those that are currently running.
1170                        should_keep = seen_most_recent_definition.insert(export.id)
1171                            && running_sink != Some(&export.description);
1172                    }
1173                }
1174                StorageCommand::RunOneshotIngestion(ingestion) => {
1175                    let already_running = self
1176                        .storage_state
1177                        .oneshot_ingestions
1178                        .contains_key(&ingestion.ingestion_id);
1179                    let was_canceled = cancel_oneshot_ingestions.contains(&ingestion.ingestion_id);
1180
1181                    should_keep = !already_running && !was_canceled;
1182                }
1183                StorageCommand::CancelOneshotIngestion(ingestion_id) => {
1184                    let already_running = self
1185                        .storage_state
1186                        .oneshot_ingestions
1187                        .contains_key(ingestion_id);
1188                    should_keep = already_running;
1189                }
1190                StorageCommand::InitializationComplete
1191                | StorageCommand::AllowWrites
1192                | StorageCommand::UpdateConfiguration(_)
1193                | StorageCommand::AllowCompaction(_, _) => (),
1194            }
1195            if should_keep {
1196                filtered_commands.push_front(command);
1197            }
1198        }
1199        let commands = filtered_commands;
1200
1201        // Make sure all the "drop commands" matched up with a source or sink.
1202        // This is also what the regular handler logic for `AllowCompaction`
1203        // would do.
1204        soft_assert_or_log!(
1205            drop_commands.is_empty(),
1206            "AllowCompaction commands for non-existent IDs {:?}",
1207            drop_commands
1208        );
1209
1210        // Determine the ID of all objects we did _not_ see; these are
1211        // considered stale.
1212        let stale_objects = self
1213            .storage_state
1214            .ingestions
1215            .values()
1216            .map(|i| i.collection_ids())
1217            .flatten()
1218            .chain(self.storage_state.exports.keys().copied())
1219            // Objects are considered stale if we did not see them re-created.
1220            .filter(|id| !expected_objects.contains(id))
1221            .collect::<Vec<_>>();
1222        let stale_oneshot_ingestions = self
1223            .storage_state
1224            .oneshot_ingestions
1225            .keys()
1226            .filter(|ingestion_id| {
1227                let created = create_oneshot_ingestions.contains(ingestion_id);
1228                let dropped = cancel_oneshot_ingestions.contains(ingestion_id);
1229                mz_ore::soft_assert_or_log!(
1230                    !created && dropped,
1231                    "dropped non-existent oneshot source"
1232                );
1233                !created && !dropped
1234            })
1235            .copied()
1236            .collect::<Vec<_>>();
1237
1238        info!(
1239            %worker_id, ?expected_objects, ?stale_objects, ?stale_oneshot_ingestions,
1240            "reconcile: modifing storage state to match expected objects",
1241        );
1242
1243        for id in stale_objects {
1244            self.storage_state.drop_collection(id);
1245        }
1246        for id in stale_oneshot_ingestions {
1247            self.storage_state.drop_oneshot_ingestion(id);
1248        }
1249
1250        // Do not report dropping any objects that do not belong to expected
1251        // objects.
1252        self.storage_state
1253            .dropped_ids
1254            .retain(|id| expected_objects.contains(id));
1255
1256        // Do not report any frontiers that do not belong to expected objects.
1257        // Note that this set of objects can differ from the set of sources and
1258        // sinks.
1259        self.storage_state
1260            .reported_frontiers
1261            .retain(|id, _| expected_objects.contains(id));
1262
1263        // Reset the reported frontiers for the remaining objects.
1264        for (_, frontier) in &mut self.storage_state.reported_frontiers {
1265            *frontier = Antichain::from_elem(<_>::minimum());
1266        }
1267
1268        // Reset the initial status reported flag when a new client connects
1269        self.storage_state.initial_status_reported = false;
1270
1271        // Execute the modified commands.
1272        for command in commands {
1273            self.storage_state.handle_storage_command(command);
1274        }
1275
1276        Ok(())
1277    }
1278}
1279
1280impl StorageState {
1281    /// Entry point for applying a storage command.
1282    ///
1283    /// NOTE: This does not have access to the timely worker and therefore
1284    /// cannot render dataflows. For dataflow rendering, this needs to either
1285    /// send asynchronous command to the `async_worker` or internal
1286    /// commands to the `internal_cmd_tx`.
1287    pub fn handle_storage_command(&mut self, cmd: StorageCommand) {
1288        match cmd {
1289            StorageCommand::Hello { .. } => panic!("Hello must be captured before"),
1290            StorageCommand::InitializationComplete => (),
1291            StorageCommand::AllowWrites => {
1292                self.read_only_tx
1293                    .send(false)
1294                    .expect("we're holding one other end");
1295                self.persist_clients.cfg().enable_compaction();
1296            }
1297            StorageCommand::UpdateConfiguration(params) => {
1298                // These can be done from all workers safely.
1299                tracing::info!("Applying configuration update: {params:?}");
1300
1301                // We serialize the dyncfg updates in StorageParameters, but configure
1302                // persist separately.
1303                self.persist_clients
1304                    .cfg()
1305                    .apply_from(&params.dyncfg_updates);
1306
1307                params.tracing.apply(self.tracing_handle.as_ref());
1308
1309                if let Some(log_filter) = &params.tracing.log_filter {
1310                    self.storage_configuration
1311                        .connection_context
1312                        .librdkafka_log_level =
1313                        mz_ore::tracing::crate_level(&log_filter.clone().into(), "librdkafka");
1314                }
1315
1316                // This needs to be broadcast by one worker and go through
1317                // the internal command fabric, to ensure consistent
1318                // ordering of dataflow rendering across all workers.
1319                if self.timely_worker_index == 0 {
1320                    self.internal_cmd_tx
1321                        .send(InternalStorageCommand::UpdateConfiguration {
1322                            storage_parameters: *params,
1323                        })
1324                }
1325            }
1326            StorageCommand::RunIngestion(ingestion) => {
1327                let RunIngestionCommand { id, description } = *ingestion;
1328
1329                // Remember the ingestion description to facilitate possible
1330                // reconciliation later.
1331                self.ingestions.insert(id, description.clone());
1332
1333                // Initialize shared frontier reporting.
1334                for id in description.collection_ids() {
1335                    self.reported_frontiers
1336                        .entry(id)
1337                        .or_insert_with(|| Antichain::from_elem(mz_repr::Timestamp::minimum()));
1338                }
1339
1340                // This needs to be done by one worker, which will broadcasts a
1341                // `CreateIngestionDataflow` command to all workers based on the response that
1342                // contains the resumption upper.
1343                //
1344                // Doing this separately on each worker could lead to differing resume_uppers
1345                // which might lead to all kinds of mayhem.
1346                //
1347                // n.b. the ingestion on each worker uses the description from worker 0––not the
1348                // ingestion in the local storage state. This is something we might have
1349                // interest in fixing in the future, e.g. materialize#19907
1350                if self.timely_worker_index == 0 {
1351                    self.async_worker.update_frontiers(id, description);
1352                }
1353            }
1354            StorageCommand::RunOneshotIngestion(oneshot) => {
1355                if self.timely_worker_index == 0 {
1356                    self.internal_cmd_tx
1357                        .send(InternalStorageCommand::RunOneshotIngestion {
1358                            ingestion_id: oneshot.ingestion_id,
1359                            collection_id: oneshot.collection_id,
1360                            collection_meta: oneshot.collection_meta,
1361                            request: oneshot.request,
1362                        });
1363                }
1364            }
1365            StorageCommand::CancelOneshotIngestion(id) => {
1366                self.drop_oneshot_ingestion(id);
1367            }
1368            StorageCommand::RunSink(export) => {
1369                // Remember the sink description to facilitate possible
1370                // reconciliation later.
1371                let prev = self.exports.insert(export.id, export.description.clone());
1372
1373                // New sink, add state.
1374                if prev.is_none() {
1375                    self.reported_frontiers.insert(
1376                        export.id,
1377                        Antichain::from_elem(mz_repr::Timestamp::minimum()),
1378                    );
1379                }
1380
1381                // This needs to be broadcast by one worker and go through the internal command
1382                // fabric, to ensure consistent ordering of dataflow rendering across all
1383                // workers.
1384                if self.timely_worker_index == 0 {
1385                    self.internal_cmd_tx
1386                        .send(InternalStorageCommand::RunSinkDataflow(
1387                            export.id,
1388                            export.description,
1389                        ));
1390                }
1391            }
1392            StorageCommand::AllowCompaction(id, frontier) => {
1393                match self.exports.get_mut(&id) {
1394                    Some(export_description) => {
1395                        // Update our knowledge of the `as_of`, in case we need to internally
1396                        // restart a sink in the future.
1397                        export_description.as_of.clone_from(&frontier);
1398                    }
1399                    // reported_frontiers contains both ingestions and their
1400                    // exports
1401                    None if self.reported_frontiers.contains_key(&id) => (),
1402                    None => {
1403                        soft_panic_or_log!("AllowCompaction command for non-existent {id}");
1404                    }
1405                }
1406
1407                if frontier.is_empty() {
1408                    // Indicates that we may drop `id`, as there are no more valid times to read.
1409                    self.drop_collection(id);
1410                }
1411            }
1412        }
1413    }
1414
1415    /// Drop the identified storage collection from the storage state.
1416    fn drop_collection(&mut self, id: GlobalId) {
1417        fail_point!("crash_on_drop");
1418
1419        self.ingestions.remove(&id);
1420        self.exports.remove(&id);
1421
1422        let _ = self.latest_status_updates.remove(&id);
1423
1424        // This will stop reporting of frontiers.
1425        //
1426        // If this object still has its frontiers reported, we will notify the
1427        // client envd of the drop.
1428        if self.reported_frontiers.remove(&id).is_some() {
1429            // The only actions left are internal cleanup, so we can commit to
1430            // the client that these objects have been dropped.
1431            //
1432            // This must be done now rather than in response to `DropDataflow`,
1433            // otherwise we introduce the possibility of a timing issue where:
1434            // - We remove all tracking state from the storage state and send
1435            //   `DropDataflow` (i.e. this block).
1436            // - While waiting to process that command, we reconcile with a new
1437            //   envd. That envd has already committed to its catalog that this
1438            //   object no longer exists.
1439            // - We process the `DropDataflow` command, and identify that this
1440            //   object has been dropped.
1441            // - The next time `dropped_ids` is processed, we send a response
1442            //   that this ID has been dropped, but the upstream state has no
1443            //   record of that object having ever existed.
1444            self.dropped_ids.push(id);
1445        }
1446
1447        // Send through async worker for correct ordering with RunIngestion, and
1448        // dropping the dataflow is done on async worker response.
1449        if self.timely_worker_index == 0 {
1450            self.async_worker.drop_dataflow(id);
1451        }
1452    }
1453
1454    /// Drop the identified oneshot ingestion from the storage state.
1455    fn drop_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) {
1456        let prev = self.oneshot_ingestions.remove(&ingestion_id);
1457        tracing::info!(%ingestion_id, existed = %prev.is_some(), "dropping oneshot ingestion");
1458    }
1459}