mz_storage/storage_state.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for storage timely instances.
7//!
8//! One instance of a [`Worker`], along with its contained [`StorageState`], is
9//! part of an ensemble of storage workers that all run inside the same timely
10//! cluster. We call this worker a _storage worker_ to disambiguate it from
11//! other kinds of workers, potentially other components that might be sharing
12//! the same timely cluster.
13//!
14//! ## Controller and internal communication
15//!
16//! A worker receives _external_ [`StorageCommands`](StorageCommand) from the
17//! storage controller, via a channel. Storage workers also share an _internal_
18//! control/command fabric ([`internal_control`]). Internal commands go through
19//! a sequencer dataflow that ensures that all workers receive all commands in
20//! the same consistent order.
21//!
22//! We need to make sure that commands that cause dataflows to be rendered are
23//! processed in the same consistent order across all workers because timely
24//! requires this. To achieve this, we make sure that only internal commands can
25//! cause dataflows to be rendered. External commands (from the controller)
26//! cause internal commands to be broadcast (by only one worker), to get
27//! dataflows rendered.
28//!
29//! The internal command fabric is also used to broadcast messages from a local
30//! operator/worker to all workers. For example, when we need to tear down and
31//! restart a dataflow on all workers when an error is encountered.
32//!
33//! ## Async Storage Worker
34//!
35//! The storage worker has a companion [`AsyncStorageWorker`] that must be used
36//! when running code that requires `async`. This is needed because a timely
37//! main loop cannot run `async` code.
38//!
39//! ## Example flow of commands for `RunIngestion`
40//!
41//! With external commands, internal commands, and the async worker,
42//! understanding where and how commands from the controller are realized can
43//! get complicated. We will follow the complete flow for `RunIngestion`, as an
44//! example:
45//!
46//! 1. Worker receives a [`StorageCommand::RunIngestion`] command from the
47//! controller.
48//! 2. This command is processed in [`StorageState::handle_storage_command`].
49//! This step cannot render dataflows, because it does not have access to the
50//! timely worker. It will only set up state that stays over the whole
51//! lifetime of the source, such as the `reported_frontier`. Putting in place
52//! this reported frontier will enable frontier reporting for that source. We
53//! will not start reporting when we only see an internal command for
54//! rendering a dataflow, which can "overtake" the external `RunIngestion`
55//! command.
56//! 3. During processing of that command, we call
57//! [`AsyncStorageWorker::update_frontiers`], which causes a command to
58//! be sent to the async worker.
59//! 4. We eventually get a response from the async worker:
60//! [`AsyncStorageWorkerResponse::FrontiersUpdated`].
61//! 5. This response is handled in [`Worker::handle_async_worker_response`].
62//! 6. Handling that response causes a
63//! [`InternalStorageCommand::CreateIngestionDataflow`] to be broadcast to
64//! all workers via the internal command fabric.
65//! 7. This message will be processed (on each worker) in
66//! [`Worker::handle_internal_storage_command`]. This is what will cause the
67//! required dataflow to be rendered on all workers.
68//!
69//! The process described above assumes that the `RunIngestion` is _not_ an
70//! update, i.e. it is in response to a `CREATE SOURCE`-like statement.
71//!
72//! The primary distinction when handling a `RunIngestion` that represents an
73//! update, is that it might fill out new internal state in the mid-level
74//! clients on the way toward being run.
75
76use std::cell::RefCell;
77use std::collections::{BTreeMap, BTreeSet, VecDeque};
78use std::path::PathBuf;
79use std::rc::Rc;
80use std::sync::Arc;
81use std::thread;
82use std::time::Duration;
83
84use crossbeam_channel::{RecvError, TryRecvError};
85use fail::fail_point;
86use mz_ore::now::NowFn;
87use mz_ore::tracing::TracingHandle;
88use mz_ore::{soft_assert_or_log, soft_panic_or_log};
89use mz_persist_client::batch::ProtoBatch;
90use mz_persist_client::cache::PersistClientCache;
91use mz_repr::{GlobalId, Timestamp};
92use mz_rocksdb::config::SharedWriteBufferManager;
93use mz_storage_client::client::{
94 RunIngestionCommand, StatusUpdate, StorageCommand, StorageResponse,
95};
96use mz_storage_types::AlterCompatible;
97use mz_storage_types::configuration::StorageConfiguration;
98use mz_storage_types::connections::ConnectionContext;
99use mz_storage_types::controller::CollectionMetadata;
100use mz_storage_types::dyncfgs::STORAGE_SERVER_MAINTENANCE_INTERVAL;
101use mz_storage_types::oneshot_sources::OneshotIngestionDescription;
102use mz_storage_types::sinks::StorageSinkDesc;
103use mz_storage_types::sources::IngestionDescription;
104use mz_timely_util::builder_async::PressOnDropButton;
105use mz_txn_wal::operator::TxnsContext;
106use timely::communication::Allocate;
107use timely::order::PartialOrder;
108use timely::progress::Timestamp as _;
109use timely::progress::frontier::Antichain;
110use timely::worker::Worker as TimelyWorker;
111use tokio::sync::{mpsc, watch};
112use tokio::time::Instant;
113use tracing::{info, warn};
114use uuid::Uuid;
115
116use crate::internal_control::{
117 self, DataflowParameters, InternalCommandReceiver, InternalCommandSender,
118 InternalStorageCommand,
119};
120use crate::metrics::StorageMetrics;
121use crate::statistics::{AggregatedStatistics, SinkStatistics, SourceStatistics};
122use crate::storage_state::async_storage_worker::{AsyncStorageWorker, AsyncStorageWorkerResponse};
123
124pub mod async_storage_worker;
125
126type CommandReceiver = crossbeam_channel::Receiver<StorageCommand>;
127type ResponseSender = mpsc::UnboundedSender<StorageResponse>;
128
129/// State maintained for each worker thread.
130///
131/// Much of this state can be viewed as local variables for the worker thread,
132/// holding state that persists across function calls.
133pub struct Worker<'w, A: Allocate> {
134 /// The underlying Timely worker.
135 ///
136 /// NOTE: This is `pub` for testing.
137 pub timely_worker: &'w mut TimelyWorker<A>,
138 /// The channel over which communication handles for newly connected clients
139 /// are delivered.
140 pub client_rx: crossbeam_channel::Receiver<(CommandReceiver, ResponseSender)>,
141 /// The state associated with collection ingress and egress.
142 pub storage_state: StorageState,
143}
144
145impl<'w, A: Allocate> Worker<'w, A> {
146 /// Creates new `Worker` state from the given components.
147 pub fn new(
148 timely_worker: &'w mut TimelyWorker<A>,
149 client_rx: crossbeam_channel::Receiver<(CommandReceiver, ResponseSender)>,
150 metrics: StorageMetrics,
151 now: NowFn,
152 connection_context: ConnectionContext,
153 instance_context: StorageInstanceContext,
154 persist_clients: Arc<PersistClientCache>,
155 txns_ctx: TxnsContext,
156 tracing_handle: Arc<TracingHandle>,
157 shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
158 ) -> Self {
159 // It is very important that we only create the internal control
160 // flow/command sequencer once because a) the worker state is re-used
161 // when a new client connects and b) dataflows that have already been
162 // rendered into the timely worker are reused as well.
163 //
164 // If we created a new sequencer every time we get a new client (likely
165 // because the controller re-started and re-connected), dataflows that
166 // were rendered before would still hold a handle to the old sequencer
167 // but we would not read their commands anymore.
168 let (internal_cmd_tx, internal_cmd_rx) =
169 internal_control::setup_command_sequencer(timely_worker);
170
171 let storage_configuration =
172 StorageConfiguration::new(connection_context, mz_dyncfgs::all_dyncfgs());
173
174 // We always initialize as read_only=true. Only when we're explicitly
175 // allowed do we switch to doing writes.
176 let (read_only_tx, read_only_rx) = watch::channel(true);
177
178 // Similar to the internal command sequencer, it is very important that
179 // we only create the async worker once because a) the worker state is
180 // re-used when a new client connects and b) commands that have already
181 // been sent and might yield a response will be lost if a new iteration
182 // of `run_client` creates a new async worker.
183 //
184 // If we created a new async worker every time we get a new client
185 // (likely because the controller re-started and re-connected), we can
186 // get into an inconsistent state where we think that a dataflow has
187 // been rendered, for example because there is an entry in
188 // `StorageState::ingestions`, while there is not yet a dataflow. This
189 // happens because the dataflow only gets rendered once we get a
190 // response from the async worker and send off an internal command.
191 //
192 // The core idea is that both the sequencer and the async worker are
193 // part of the per-worker state, and must be treated as such, meaning
194 // they must survive between invocations of `run_client`.
195
196 // TODO(aljoscha): This thread unparking business seems brittle, but that's
197 // also how the command channel works currently. We can wrap it inside a
198 // struct that holds both a channel and a `Thread`, but I don't
199 // think that would help too much.
200 let async_worker = async_storage_worker::AsyncStorageWorker::new(
201 thread::current(),
202 Arc::clone(&persist_clients),
203 );
204 let cluster_memory_limit = instance_context.cluster_memory_limit;
205
206 let storage_state = StorageState {
207 source_uppers: BTreeMap::new(),
208 source_tokens: BTreeMap::new(),
209 metrics,
210 reported_frontiers: BTreeMap::new(),
211 ingestions: BTreeMap::new(),
212 exports: BTreeMap::new(),
213 oneshot_ingestions: BTreeMap::new(),
214 now,
215 timely_worker_index: timely_worker.index(),
216 timely_worker_peers: timely_worker.peers(),
217 instance_context,
218 persist_clients,
219 txns_ctx,
220 sink_tokens: BTreeMap::new(),
221 sink_write_frontiers: BTreeMap::new(),
222 dropped_ids: Vec::new(),
223 aggregated_statistics: AggregatedStatistics::new(
224 timely_worker.index(),
225 timely_worker.peers(),
226 ),
227 shared_status_updates: Default::default(),
228 latest_status_updates: Default::default(),
229 initial_status_reported: Default::default(),
230 internal_cmd_tx,
231 internal_cmd_rx,
232 read_only_tx,
233 read_only_rx,
234 async_worker,
235 storage_configuration,
236 dataflow_parameters: DataflowParameters::new(
237 shared_rocksdb_write_buffer_manager,
238 cluster_memory_limit,
239 ),
240 tracing_handle,
241 server_maintenance_interval: Duration::ZERO,
242 };
243
244 // TODO(aljoscha): We might want `async_worker` and `internal_cmd_tx` to
245 // be fields of `Worker` instead of `StorageState`, but at least for the
246 // command flow sources and sinks need access to that. We can refactor
247 // this once we have a clearer boundary between what sources/sinks need
248 // and the full "power" of the internal command flow, which should stay
249 // internal to the worker/not be exposed to source/sink implementations.
250 Self {
251 timely_worker,
252 client_rx,
253 storage_state,
254 }
255 }
256}
257
258/// Worker-local state related to the ingress or egress of collections of data.
259pub struct StorageState {
260 /// The highest observed upper frontier for collection.
261 ///
262 /// This is shared among all source instances, so that they can jointly advance the
263 /// frontier even as other instances are created and dropped. Ideally, the Storage
264 /// module would eventually provide one source of truth on this rather than multiple,
265 /// and we should aim for that but are not there yet.
266 pub source_uppers: BTreeMap<GlobalId, Rc<RefCell<Antichain<mz_repr::Timestamp>>>>,
267 /// Handles to created sources, keyed by ID
268 /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
269 /// to prevent usage of custom shutdown tokens that are tricky to get right.
270 pub source_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
271 /// Metrics for storage objects.
272 pub metrics: StorageMetrics,
273 /// Tracks the conditional write frontiers we have reported.
274 pub reported_frontiers: BTreeMap<GlobalId, Antichain<Timestamp>>,
275 /// Descriptions of each installed ingestion.
276 pub ingestions: BTreeMap<GlobalId, IngestionDescription<CollectionMetadata>>,
277 /// Descriptions of each installed export.
278 pub exports: BTreeMap<GlobalId, StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>>,
279 /// Descriptions of oneshot ingestions that are currently running.
280 pub oneshot_ingestions: BTreeMap<uuid::Uuid, OneshotIngestionDescription<ProtoBatch>>,
281 /// Undocumented
282 pub now: NowFn,
283 /// Index of the associated timely dataflow worker.
284 pub timely_worker_index: usize,
285 /// Peers in the associated timely dataflow worker.
286 pub timely_worker_peers: usize,
287 /// Other configuration for sources and sinks.
288 pub instance_context: StorageInstanceContext,
289 /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
290 /// This is intentionally shared between workers
291 pub persist_clients: Arc<PersistClientCache>,
292 /// Context necessary for rendering txn-wal operators.
293 pub txns_ctx: TxnsContext,
294 /// Tokens that should be dropped when a dataflow is dropped to clean up
295 /// associated state.
296 /// NB: The type of the tokens must not be changed to something other than `PressOnDropButton`
297 /// to prevent usage of custom shutdown tokens that are tricky to get right.
298 pub sink_tokens: BTreeMap<GlobalId, Vec<PressOnDropButton>>,
299 /// Frontier of sink writes (all subsequent writes will be at times at or
300 /// equal to this frontier)
301 pub sink_write_frontiers: BTreeMap<GlobalId, Rc<RefCell<Antichain<Timestamp>>>>,
302 /// Collection ids that have been dropped but not yet reported as dropped
303 pub dropped_ids: Vec<GlobalId>,
304
305 /// Statistics for sources and sinks.
306 pub aggregated_statistics: AggregatedStatistics,
307
308 /// A place shared with running dataflows, so that health operators, can
309 /// report status updates back to us.
310 ///
311 /// **NOTE**: Operators that append to this collection should take care to only add new
312 /// status updates if the status of the ingestion/export in question has _changed_.
313 pub shared_status_updates: Rc<RefCell<Vec<StatusUpdate>>>,
314
315 /// The latest status update for each object.
316 pub latest_status_updates: BTreeMap<GlobalId, StatusUpdate>,
317
318 /// Whether we have reported the initial status after connecting to a new client.
319 /// This is reset to false when a new client connects.
320 pub initial_status_reported: bool,
321
322 /// Sender for cluster-internal storage commands. These can be sent from
323 /// within workers/operators and will be distributed to all workers. For
324 /// example, for shutting down an entire dataflow from within a
325 /// operator/worker.
326 pub internal_cmd_tx: InternalCommandSender,
327 /// Receiver for cluster-internal storage commands.
328 pub internal_cmd_rx: InternalCommandReceiver,
329
330 /// When this replica/cluster is in read-only mode it must not affect any
331 /// changes to external state. This flag can only be changed by a
332 /// [StorageCommand::AllowWrites].
333 ///
334 /// Everything running on this replica/cluster must obey this flag. At the
335 /// time of writing, nothing currently looks at this flag.
336 /// TODO(benesch): fix this.
337 ///
338 /// NOTE: In the future, we might want a more complicated flag, for example
339 /// something that tells us after which timestamp we are allowed to write.
340 /// In this first version we are keeping things as simple as possible!
341 pub read_only_rx: watch::Receiver<bool>,
342
343 /// Send-side for read-only state.
344 pub read_only_tx: watch::Sender<bool>,
345
346 /// Async worker companion, used for running code that requires async, which
347 /// the timely main loop cannot do.
348 pub async_worker: AsyncStorageWorker<mz_repr::Timestamp>,
349
350 /// Configuration for source and sink connections.
351 pub storage_configuration: StorageConfiguration,
352 /// Dynamically configurable parameters that control how dataflows are rendered.
353 /// NOTE(guswynn): we should consider moving these into `storage_configuration`.
354 pub dataflow_parameters: DataflowParameters,
355
356 /// A process-global handle to tracing configuration.
357 pub tracing_handle: Arc<TracingHandle>,
358
359 /// Interval at which to perform server maintenance tasks. Set to a zero interval to
360 /// perform maintenance with every `step_or_park` invocation.
361 pub server_maintenance_interval: Duration,
362}
363
364/// Extra context for a storage instance.
365/// This is extra information that is used when rendering source
366/// and sinks that is not tied to the source/connection configuration itself.
367#[derive(Clone)]
368pub struct StorageInstanceContext {
369 /// A directory that can be used for scratch work.
370 pub scratch_directory: Option<PathBuf>,
371 /// A global `rocksdb::Env`, shared across ALL instances of `RocksDB` (even
372 /// across sources!). This `Env` lets us control some resources (like background threads)
373 /// process-wide.
374 pub rocksdb_env: rocksdb::Env,
375 /// The memory limit of the materialize cluster replica. This will
376 /// be used to calculate and configure the maximum inflight bytes for backpressure
377 pub cluster_memory_limit: Option<usize>,
378}
379
380impl StorageInstanceContext {
381 /// Build a new `StorageInstanceContext`.
382 pub fn new(
383 scratch_directory: Option<PathBuf>,
384 cluster_memory_limit: Option<usize>,
385 ) -> Result<Self, anyhow::Error> {
386 Ok(Self {
387 scratch_directory,
388 rocksdb_env: rocksdb::Env::new()?,
389 cluster_memory_limit,
390 })
391 }
392
393 /// Constructs a new connection context for usage in tests.
394 pub fn for_tests(rocksdb_env: rocksdb::Env) -> Self {
395 Self {
396 scratch_directory: None,
397 rocksdb_env,
398 cluster_memory_limit: None,
399 }
400 }
401}
402
403impl<'w, A: Allocate> Worker<'w, A> {
404 /// Waits for client connections and runs them to completion.
405 pub fn run(&mut self) {
406 while let Ok((rx, tx)) = self.client_rx.recv() {
407 self.run_client(rx, tx);
408 }
409 }
410
411 /// Runs this (timely) storage worker until the given `command_rx` is
412 /// disconnected.
413 ///
414 /// See the [module documentation](crate::storage_state) for this
415 /// workers responsibilities, how it communicates with the other workers and
416 /// how commands flow from the controller and through the workers.
417 fn run_client(&mut self, command_rx: CommandReceiver, response_tx: ResponseSender) {
418 // At this point, all workers are still reading from the command flow.
419 if self.reconcile(&command_rx).is_err() {
420 return;
421 }
422
423 // The last time we reported statistics.
424 let mut last_stats_time = Instant::now();
425
426 // The last time we did periodic maintenance.
427 let mut last_maintenance = std::time::Instant::now();
428
429 let mut disconnected = false;
430 while !disconnected {
431 let config = &self.storage_state.storage_configuration;
432 let stats_interval = config.parameters.statistics_collection_interval;
433
434 let maintenance_interval = self.storage_state.server_maintenance_interval;
435
436 let now = std::time::Instant::now();
437 // Determine if we need to perform maintenance, which is true if `maintenance_interval`
438 // time has passed since the last maintenance.
439 let sleep_duration;
440 if now >= last_maintenance + maintenance_interval {
441 last_maintenance = now;
442 sleep_duration = None;
443
444 self.report_frontier_progress(&response_tx);
445 } else {
446 // We didn't perform maintenance, sleep until the next maintenance interval.
447 let next_maintenance = last_maintenance + maintenance_interval;
448 sleep_duration = Some(next_maintenance.saturating_duration_since(now))
449 }
450
451 // Ask Timely to execute a unit of work.
452 //
453 // If there are no pending commands or responses from the async
454 // worker, we ask Timely to park the thread if there's nothing to
455 // do. We rely on another thread unparking us when there's new work
456 // to be done, e.g., when sending a command or when new Kafka
457 // messages have arrived.
458 //
459 // It is critical that we allow Timely to park iff there are no
460 // pending commands or responses. The command may have already been
461 // consumed by the call to `client_rx.recv`. See:
462 // https://github.com/MaterializeInc/materialize/pull/13973#issuecomment-1200312212
463 if command_rx.is_empty() && self.storage_state.async_worker.is_empty() {
464 // Make sure we wake up again to report any pending statistics updates.
465 let mut park_duration = stats_interval.saturating_sub(last_stats_time.elapsed());
466 if let Some(sleep_duration) = sleep_duration {
467 park_duration = std::cmp::min(sleep_duration, park_duration);
468 }
469 self.timely_worker.step_or_park(Some(park_duration));
470 } else {
471 self.timely_worker.step();
472 }
473
474 // Rerport any dropped ids
475 for id in std::mem::take(&mut self.storage_state.dropped_ids) {
476 self.send_storage_response(&response_tx, StorageResponse::DroppedId(id));
477 }
478
479 self.process_oneshot_ingestions(&response_tx);
480
481 self.report_status_updates(&response_tx);
482
483 if last_stats_time.elapsed() >= stats_interval {
484 self.report_storage_statistics(&response_tx);
485 last_stats_time = Instant::now();
486 }
487
488 // Handle any received commands.
489 loop {
490 match command_rx.try_recv() {
491 Ok(cmd) => self.storage_state.handle_storage_command(cmd),
492 Err(TryRecvError::Empty) => break,
493 Err(TryRecvError::Disconnected) => {
494 disconnected = true;
495 break;
496 }
497 }
498 }
499
500 // Handle responses from the async worker.
501 while let Ok(response) = self.storage_state.async_worker.try_recv() {
502 self.handle_async_worker_response(response);
503 }
504
505 // Handle any received commands.
506 while let Some(command) = self.storage_state.internal_cmd_rx.try_recv() {
507 self.handle_internal_storage_command(command);
508 }
509 }
510 }
511
512 /// Entry point for applying a response from the async storage worker.
513 pub fn handle_async_worker_response(
514 &self,
515 async_response: AsyncStorageWorkerResponse<mz_repr::Timestamp>,
516 ) {
517 // NOTE: If we want to share the load of async processing we
518 // have to change `handle_storage_command` and change this
519 // assert.
520 assert_eq!(
521 self.timely_worker.index(),
522 0,
523 "only worker #0 is doing async processing"
524 );
525 match async_response {
526 AsyncStorageWorkerResponse::FrontiersUpdated {
527 id,
528 ingestion_description,
529 as_of,
530 resume_uppers,
531 source_resume_uppers,
532 } => {
533 self.storage_state.internal_cmd_tx.send(
534 InternalStorageCommand::CreateIngestionDataflow {
535 id,
536 ingestion_description,
537 as_of,
538 resume_uppers,
539 source_resume_uppers,
540 },
541 );
542 }
543 AsyncStorageWorkerResponse::DropDataflow(id) => {
544 self.storage_state
545 .internal_cmd_tx
546 .send(InternalStorageCommand::DropDataflow(vec![id]));
547 }
548 }
549 }
550
551 /// Entry point for applying an internal storage command.
552 pub fn handle_internal_storage_command(&mut self, internal_cmd: InternalStorageCommand) {
553 match internal_cmd {
554 InternalStorageCommand::SuspendAndRestart { id, reason } => {
555 info!(
556 "worker {}/{} initiating suspend-and-restart for {id} because of: {reason}",
557 self.timely_worker.index(),
558 self.timely_worker.peers(),
559 );
560
561 let maybe_ingestion = self.storage_state.ingestions.get(&id).cloned();
562 if let Some(ingestion_description) = maybe_ingestion {
563 // Yank the token of the previously existing source dataflow.Note that this
564 // token also includes any source exports/subsources.
565 let maybe_token = self.storage_state.source_tokens.remove(&id);
566 if maybe_token.is_none() {
567 // Something has dropped the source. Make sure we don't
568 // accidentally re-create it.
569 return;
570 }
571
572 // This needs to be done by one worker, which will
573 // broadcasts a `CreateIngestionDataflow` command to all
574 // workers based on the response that contains the
575 // resumption upper.
576 //
577 // Doing this separately on each worker could lead to
578 // differing resume_uppers which might lead to all kinds of
579 // mayhem.
580 //
581 // TODO(aljoscha): If we ever become worried that this is
582 // putting undue pressure on worker 0 we can pick the
583 // designated worker for a source/sink based on `id.hash()`.
584 if self.timely_worker.index() == 0 {
585 for (id, _) in ingestion_description.source_exports.iter() {
586 self.storage_state
587 .aggregated_statistics
588 .advance_global_epoch(*id);
589 }
590 self.storage_state
591 .async_worker
592 .update_frontiers(id, ingestion_description);
593 }
594
595 // Continue with other commands.
596 return;
597 }
598
599 let maybe_sink = self.storage_state.exports.get(&id).cloned();
600 if let Some(sink_description) = maybe_sink {
601 // Yank the token of the previously existing sink
602 // dataflow.
603 let maybe_token = self.storage_state.sink_tokens.remove(&id);
604
605 if maybe_token.is_none() {
606 // Something has dropped the sink. Make sure we don't
607 // accidentally re-create it.
608 return;
609 }
610
611 // This needs to be broadcast by one worker and go through
612 // the internal command fabric, to ensure consistent
613 // ordering of dataflow rendering across all workers.
614 if self.timely_worker.index() == 0 {
615 self.storage_state
616 .aggregated_statistics
617 .advance_global_epoch(id);
618 self.storage_state.internal_cmd_tx.send(
619 InternalStorageCommand::RunSinkDataflow(id, sink_description),
620 );
621 }
622
623 // Continue with other commands.
624 return;
625 }
626
627 if !self
628 .storage_state
629 .ingestions
630 .values()
631 .any(|v| v.source_exports.contains_key(&id))
632 {
633 // Our current approach to dropping a source results in a race between shard
634 // finalization (which happens in the controller) and dataflow shutdown (which
635 // happens in clusterd). If a source is created and dropped fast enough -or the
636 // two commands get sufficiently delayed- then it's possible to receive a
637 // SuspendAndRestart command for an unknown source. We cannot assert that this
638 // never happens but we log an error here to track how often this happens.
639 warn!(
640 "got InternalStorageCommand::SuspendAndRestart for something that is not a source or sink: {id}"
641 );
642 }
643 }
644 InternalStorageCommand::CreateIngestionDataflow {
645 id: ingestion_id,
646 mut ingestion_description,
647 as_of,
648 mut resume_uppers,
649 mut source_resume_uppers,
650 } => {
651 info!(
652 ?as_of,
653 ?resume_uppers,
654 "worker {}/{} trying to (re-)start ingestion {ingestion_id}",
655 self.timely_worker.index(),
656 self.timely_worker.peers(),
657 );
658
659 // We initialize statistics before we prune finished exports. We
660 // still want to export statistics for these, plus the rendering
661 // machinery will get confused if there are not at least
662 // statistics for the "main" source.
663 for (export_id, export) in ingestion_description.source_exports.iter() {
664 let resume_upper = resume_uppers[export_id].clone();
665 self.storage_state.aggregated_statistics.initialize_source(
666 *export_id,
667 resume_upper.clone(),
668 || {
669 SourceStatistics::new(
670 *export_id,
671 self.storage_state.timely_worker_index,
672 &self.storage_state.metrics.source_statistics,
673 ingestion_id,
674 &export.storage_metadata.data_shard,
675 export.data_config.envelope.clone(),
676 resume_upper,
677 )
678 },
679 );
680 }
681
682 let finished_exports: BTreeSet<GlobalId> = resume_uppers
683 .iter()
684 .filter(|(_, frontier)| frontier.is_empty())
685 .map(|(id, _)| *id)
686 .collect();
687
688 resume_uppers.retain(|id, _| !finished_exports.contains(id));
689 source_resume_uppers.retain(|id, _| !finished_exports.contains(id));
690 ingestion_description
691 .source_exports
692 .retain(|id, _| !finished_exports.contains(id));
693
694 for id in ingestion_description.collection_ids() {
695 // If there is already a shared upper, we re-use it, to make
696 // sure that parties that are already using the shared upper
697 // can continue doing so.
698 let source_upper = self
699 .storage_state
700 .source_uppers
701 .entry(id.clone())
702 .or_insert_with(|| {
703 Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())))
704 });
705
706 let mut source_upper = source_upper.borrow_mut();
707 if !source_upper.is_empty() {
708 source_upper.clear();
709 source_upper.insert(mz_repr::Timestamp::minimum());
710 }
711 }
712
713 // If all subsources of the source are finished, we can skip rendering entirely.
714 // Also, if `as_of` is empty, the dataflow has been finalized, so we can skip it as
715 // well.
716 //
717 // TODO(guswynn|petrosagg): this is a bit hacky, and is a consequence of storage state
718 // management being a bit of a mess. we should clean this up and remove weird if
719 // statements like this.
720 if resume_uppers.values().all(|frontier| frontier.is_empty()) || as_of.is_empty() {
721 tracing::info!(
722 ?resume_uppers,
723 ?as_of,
724 "worker {}/{} skipping building ingestion dataflow \
725 for {ingestion_id} because the ingestion is finished",
726 self.timely_worker.index(),
727 self.timely_worker.peers(),
728 );
729 return;
730 }
731
732 crate::render::build_ingestion_dataflow(
733 self.timely_worker,
734 &mut self.storage_state,
735 ingestion_id,
736 ingestion_description,
737 as_of,
738 resume_uppers,
739 source_resume_uppers,
740 );
741 }
742 InternalStorageCommand::RunOneshotIngestion {
743 ingestion_id,
744 collection_id,
745 collection_meta,
746 request,
747 } => {
748 crate::render::build_oneshot_ingestion_dataflow(
749 self.timely_worker,
750 &mut self.storage_state,
751 ingestion_id,
752 collection_id,
753 collection_meta,
754 request,
755 );
756 }
757 InternalStorageCommand::RunSinkDataflow(sink_id, sink_description) => {
758 info!(
759 "worker {}/{} trying to (re-)start sink {sink_id}",
760 self.timely_worker.index(),
761 self.timely_worker.peers(),
762 );
763
764 {
765 // If there is already a shared write frontier, we re-use it, to
766 // make sure that parties that are already using the shared
767 // frontier can continue doing so.
768 let sink_write_frontier = self
769 .storage_state
770 .sink_write_frontiers
771 .entry(sink_id.clone())
772 .or_insert_with(|| Rc::new(RefCell::new(Antichain::new())));
773
774 let mut sink_write_frontier = sink_write_frontier.borrow_mut();
775 sink_write_frontier.clear();
776 sink_write_frontier.insert(mz_repr::Timestamp::minimum());
777 }
778 self.storage_state
779 .aggregated_statistics
780 .initialize_sink(sink_id, || {
781 SinkStatistics::new(
782 sink_id,
783 self.storage_state.timely_worker_index,
784 &self.storage_state.metrics.sink_statistics,
785 )
786 });
787
788 crate::render::build_export_dataflow(
789 self.timely_worker,
790 &mut self.storage_state,
791 sink_id,
792 sink_description,
793 );
794 }
795 InternalStorageCommand::DropDataflow(ids) => {
796 for id in &ids {
797 // Clean up per-source / per-sink state.
798 self.storage_state.source_uppers.remove(id);
799 self.storage_state.source_tokens.remove(id);
800
801 self.storage_state.sink_tokens.remove(id);
802
803 self.storage_state.aggregated_statistics.deinitialize(*id);
804 }
805 }
806 InternalStorageCommand::UpdateConfiguration { storage_parameters } => {
807 self.storage_state
808 .dataflow_parameters
809 .update(storage_parameters.clone());
810 self.storage_state
811 .storage_configuration
812 .update(storage_parameters);
813
814 // Clear out the updates as we no longer forward them to anyone else to process.
815 // We clone `StorageState::storage_configuration` many times during rendering
816 // and want to avoid cloning these unused updates.
817 self.storage_state
818 .storage_configuration
819 .parameters
820 .dyncfg_updates = Default::default();
821
822 // Remember the maintenance interval locally to avoid reading it from the config set on
823 // every server iteration.
824 self.storage_state.server_maintenance_interval =
825 STORAGE_SERVER_MAINTENANCE_INTERVAL
826 .get(self.storage_state.storage_configuration.config_set());
827 }
828 InternalStorageCommand::StatisticsUpdate { sources, sinks } => self
829 .storage_state
830 .aggregated_statistics
831 .ingest(sources, sinks),
832 }
833 }
834
835 /// Emit information about write frontier progress, along with information that should
836 /// be made durable for this to be the case.
837 ///
838 /// The write frontier progress is "conditional" in that it is not until the information is made
839 /// durable that the data are emitted to downstream workers, and indeed they should not rely on
840 /// the completeness of what they hear until the information is made durable.
841 ///
842 /// Specifically, this sends information about new timestamp bindings created by dataflow workers,
843 /// with the understanding if that if made durable (and ack'd back to the workers) the source will
844 /// in fact progress with this write frontier.
845 pub fn report_frontier_progress(&mut self, response_tx: &ResponseSender) {
846 let mut new_uppers = Vec::new();
847
848 // Check if any observed frontier should advance the reported frontiers.
849 for (id, frontier) in self
850 .storage_state
851 .source_uppers
852 .iter()
853 .chain(self.storage_state.sink_write_frontiers.iter())
854 {
855 let Some(reported_frontier) = self.storage_state.reported_frontiers.get_mut(id) else {
856 // Frontier reporting has not yet been started for this object.
857 // Potentially because this timely worker has not yet seen the
858 // `CreateSources` command.
859 continue;
860 };
861
862 let observed_frontier = frontier.borrow();
863
864 // Only do a thing if it *advances* the frontier, not just *changes* the frontier.
865 // This is protection against `frontier` lagging behind what we have conditionally reported.
866 if PartialOrder::less_than(reported_frontier, &observed_frontier) {
867 new_uppers.push((*id, observed_frontier.clone()));
868 reported_frontier.clone_from(&observed_frontier);
869 }
870 }
871
872 for (id, upper) in new_uppers {
873 self.send_storage_response(response_tx, StorageResponse::FrontierUpper(id, upper));
874 }
875 }
876
877 /// Pumps latest status updates from the buffer shared with operators and
878 /// reports any updates that need reporting.
879 pub fn report_status_updates(&mut self, response_tx: &ResponseSender) {
880 // If we haven't done the initial status report, report all current statuses
881 if !self.storage_state.initial_status_reported {
882 // We pull initially reported status updates to "now", so that they
883 // sort as the latest update in internal status collections. This
884 // makes it so that a newly bootstrapped envd can append status
885 // updates to internal status collections that report an accurate
886 // view as of the time when they came up.
887 let now_ts = mz_ore::now::to_datetime((self.storage_state.now)());
888 let status_updates = self
889 .storage_state
890 .latest_status_updates
891 .values()
892 .cloned()
893 .map(|mut update| {
894 update.timestamp = now_ts.clone();
895 update
896 });
897 for update in status_updates {
898 self.send_storage_response(response_tx, StorageResponse::StatusUpdate(update));
899 }
900 self.storage_state.initial_status_reported = true;
901 }
902
903 // Pump updates into our state and stage them for reporting.
904 for shared_update in self.storage_state.shared_status_updates.take() {
905 self.send_storage_response(
906 response_tx,
907 StorageResponse::StatusUpdate(shared_update.clone()),
908 );
909
910 self.storage_state
911 .latest_status_updates
912 .insert(shared_update.id, shared_update);
913 }
914 }
915
916 /// Report source statistics back to the controller.
917 pub fn report_storage_statistics(&mut self, response_tx: &ResponseSender) {
918 let (sources, sinks) = self.storage_state.aggregated_statistics.emit_local();
919 if !sources.is_empty() || !sinks.is_empty() {
920 self.storage_state
921 .internal_cmd_tx
922 .send(InternalStorageCommand::StatisticsUpdate { sources, sinks })
923 }
924
925 let (sources, sinks) = self.storage_state.aggregated_statistics.snapshot();
926 if !sources.is_empty() || !sinks.is_empty() {
927 self.send_storage_response(
928 response_tx,
929 StorageResponse::StatisticsUpdates(sources, sinks),
930 );
931 }
932 }
933
934 /// Send a response to the coordinator.
935 fn send_storage_response(&self, response_tx: &ResponseSender, response: StorageResponse) {
936 // Ignore send errors because the coordinator is free to ignore our
937 // responses. This happens during shutdown.
938 let _ = response_tx.send(response);
939 }
940
941 fn process_oneshot_ingestions(&mut self, response_tx: &ResponseSender) {
942 use tokio::sync::mpsc::error::TryRecvError;
943
944 let mut to_remove = vec![];
945
946 for (ingestion_id, ingestion_state) in &mut self.storage_state.oneshot_ingestions {
947 loop {
948 match ingestion_state.results.try_recv() {
949 Ok(result) => {
950 let response = match result {
951 Ok(maybe_batch) => maybe_batch.into_iter().map(Result::Ok).collect(),
952 Err(err) => vec![Err(err)],
953 };
954 let staged_batches = BTreeMap::from([(*ingestion_id, response)]);
955 let _ = response_tx.send(StorageResponse::StagedBatches(staged_batches));
956 }
957 Err(TryRecvError::Empty) => {
958 break;
959 }
960 Err(TryRecvError::Disconnected) => {
961 to_remove.push(*ingestion_id);
962 break;
963 }
964 }
965 }
966 }
967
968 for ingestion_id in to_remove {
969 tracing::info!(?ingestion_id, "removing oneshot ingestion");
970 self.storage_state.oneshot_ingestions.remove(&ingestion_id);
971 }
972 }
973
974 /// Extract commands until `InitializationComplete`, and make the worker
975 /// reflect those commands. If the worker can not be made to reflect the
976 /// commands, return an error.
977 fn reconcile(&mut self, command_rx: &CommandReceiver) -> Result<(), RecvError> {
978 let worker_id = self.timely_worker.index();
979
980 // To initialize the connection, we want to drain all commands until we
981 // receive a `StorageCommand::InitializationComplete` command to form a
982 // target command state.
983 let mut commands = vec![];
984 loop {
985 match command_rx.recv()? {
986 StorageCommand::InitializationComplete => break,
987 command => commands.push(command),
988 }
989 }
990
991 // Track which frontiers this envd expects; we will also set their
992 // initial timestamp to the minimum timestamp to reset them as we don't
993 // know what frontiers the new envd expects.
994 let mut expected_objects = BTreeSet::new();
995
996 let mut drop_commands = BTreeSet::new();
997 let mut running_ingestion_descriptions = self.storage_state.ingestions.clone();
998 let mut running_exports_descriptions = self.storage_state.exports.clone();
999
1000 let mut create_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1001 let mut cancel_oneshot_ingestions: BTreeSet<Uuid> = BTreeSet::new();
1002
1003 for command in &mut commands {
1004 match command {
1005 StorageCommand::CreateTimely { .. } => {
1006 panic!("CreateTimely must be captured before")
1007 }
1008 StorageCommand::AllowCompaction(id, since) => {
1009 info!(%worker_id, ?id, ?since, "reconcile: received AllowCompaction command");
1010
1011 // collect all "drop commands". These are `AllowCompaction`
1012 // commands that compact to the empty since. Then, later, we make sure
1013 // we retain only those `Create*` commands that are not dropped. We
1014 // assume that the `AllowCompaction` command is ordered after the
1015 // `Create*` commands but don't assert that.
1016 // WIP: Should we assert?
1017 if since.is_empty() {
1018 drop_commands.insert(*id);
1019 }
1020 }
1021 StorageCommand::RunIngestion(ingestion) => {
1022 info!(%worker_id, ?ingestion, "reconcile: received RunIngestion command");
1023
1024 // Ensure that ingestions are forward-rolling alter compatible.
1025 let prev = running_ingestion_descriptions
1026 .insert(ingestion.id, ingestion.description.clone());
1027
1028 if let Some(prev_ingest) = prev {
1029 // If the new ingestion is not exactly equal to the currently running
1030 // ingestion, we must either track that we need to synthesize an update
1031 // command to change the ingestion, or panic.
1032 prev_ingest
1033 .alter_compatible(ingestion.id, &ingestion.description)
1034 .expect("only alter compatible ingestions permitted");
1035 }
1036 }
1037 StorageCommand::RunSink(export) => {
1038 info!(%worker_id, ?export, "reconcile: received RunSink command");
1039
1040 // Ensure that exports are forward-rolling alter compatible.
1041 let prev =
1042 running_exports_descriptions.insert(export.id, export.description.clone());
1043
1044 if let Some(prev_export) = prev {
1045 prev_export
1046 .alter_compatible(export.id, &export.description)
1047 .expect("only alter compatible exports permitted");
1048 }
1049 }
1050 StorageCommand::RunOneshotIngestion(ingestion) => {
1051 info!(%worker_id, ?ingestion, "reconcile: received RunOneshotIngestion command");
1052 create_oneshot_ingestions.insert(ingestion.ingestion_id);
1053 }
1054 StorageCommand::CancelOneshotIngestion(uuid) => {
1055 info!(%worker_id, %uuid, "reconcile: received CancelOneshotIngestion command");
1056 cancel_oneshot_ingestions.insert(*uuid);
1057 }
1058 StorageCommand::InitializationComplete
1059 | StorageCommand::AllowWrites
1060 | StorageCommand::UpdateConfiguration(_) => (),
1061 }
1062 }
1063
1064 let mut seen_most_recent_definition = BTreeSet::new();
1065
1066 // We iterate over this backward to ensure that we keep only the most recent ingestion
1067 // description.
1068 let mut filtered_commands = VecDeque::new();
1069 for mut command in commands.into_iter().rev() {
1070 let mut should_keep = true;
1071 match &mut command {
1072 StorageCommand::CreateTimely { .. } => {
1073 panic!("CreateTimely must be captured before")
1074 }
1075 StorageCommand::RunIngestion(ingestion) => {
1076 // Subsources can be dropped independently of their
1077 // primary source, so we evaluate them in a separate
1078 // loop.
1079 for export_id in ingestion
1080 .description
1081 .source_exports
1082 .keys()
1083 .filter(|export_id| **export_id != ingestion.id)
1084 {
1085 if drop_commands.remove(export_id) {
1086 info!(%worker_id, %export_id, "reconcile: dropping subsource");
1087 self.storage_state.dropped_ids.push(*export_id);
1088 }
1089 }
1090
1091 if drop_commands.remove(&ingestion.id)
1092 || self.storage_state.dropped_ids.contains(&ingestion.id)
1093 {
1094 info!(%worker_id, %ingestion.id, "reconcile: dropping ingestion");
1095
1096 // If an ingestion is dropped, so too must all of
1097 // its subsources (i.e. ingestion exports, as well
1098 // as its progress subsource).
1099 for id in ingestion.description.collection_ids() {
1100 drop_commands.remove(&id);
1101 self.storage_state.dropped_ids.push(id);
1102 }
1103 should_keep = false;
1104 } else {
1105 let most_recent_defintion =
1106 seen_most_recent_definition.insert(ingestion.id);
1107
1108 if most_recent_defintion {
1109 // If this is the most recent definition, this
1110 // is what we will be running when
1111 // reconciliation completes. This definition
1112 // must not include any dropped subsources.
1113 ingestion.description.source_exports.retain(|export_id, _| {
1114 !self.storage_state.dropped_ids.contains(export_id)
1115 });
1116
1117 // After clearing any dropped subsources, we can
1118 // state that we expect all of these to exist.
1119 expected_objects.extend(ingestion.description.collection_ids());
1120 }
1121
1122 let running_ingestion = self.storage_state.ingestions.get(&ingestion.id);
1123
1124 // We keep only:
1125 // - The most recent version of the ingestion, which
1126 // is why these commands are run in reverse.
1127 // - Ingestions whose descriptions are not exactly
1128 // those that are currently running.
1129 should_keep = most_recent_defintion
1130 && running_ingestion != Some(&ingestion.description)
1131 }
1132 }
1133 StorageCommand::RunSink(export) => {
1134 if drop_commands.remove(&export.id)
1135 // If there were multiple `RunSink` in the command
1136 // stream, we want to ensure none of them are
1137 // retained.
1138 || self.storage_state.dropped_ids.contains(&export.id)
1139 {
1140 info!(%worker_id, %export.id, "reconcile: dropping sink");
1141
1142 // Make sure that we report back that the ID was
1143 // dropped.
1144 self.storage_state.dropped_ids.push(export.id);
1145
1146 should_keep = false
1147 } else {
1148 expected_objects.insert(export.id);
1149
1150 let running_sink = self.storage_state.exports.get(&export.id);
1151
1152 // We keep only:
1153 // - The most recent version of the sink, which
1154 // is why these commands are run in reverse.
1155 // - Sinks whose descriptions are not exactly
1156 // those that are currently running.
1157 should_keep = seen_most_recent_definition.insert(export.id)
1158 && running_sink != Some(&export.description);
1159 }
1160 }
1161 StorageCommand::RunOneshotIngestion(ingestion) => {
1162 let already_running = self
1163 .storage_state
1164 .oneshot_ingestions
1165 .contains_key(&ingestion.ingestion_id);
1166 let was_canceled = cancel_oneshot_ingestions.contains(&ingestion.ingestion_id);
1167
1168 should_keep = !already_running && !was_canceled;
1169 }
1170 StorageCommand::CancelOneshotIngestion(ingestion_id) => {
1171 let already_running = self
1172 .storage_state
1173 .oneshot_ingestions
1174 .contains_key(ingestion_id);
1175 should_keep = already_running;
1176 }
1177 StorageCommand::InitializationComplete
1178 | StorageCommand::AllowWrites
1179 | StorageCommand::UpdateConfiguration(_)
1180 | StorageCommand::AllowCompaction(_, _) => (),
1181 }
1182 if should_keep {
1183 filtered_commands.push_front(command);
1184 }
1185 }
1186 let commands = filtered_commands;
1187
1188 // Make sure all the "drop commands" matched up with a source or sink.
1189 // This is also what the regular handler logic for `AllowCompaction`
1190 // would do.
1191 soft_assert_or_log!(
1192 drop_commands.is_empty(),
1193 "AllowCompaction commands for non-existent IDs {:?}",
1194 drop_commands
1195 );
1196
1197 // Determine the ID of all objects we did _not_ see; these are
1198 // considered stale.
1199 let stale_objects = self
1200 .storage_state
1201 .ingestions
1202 .values()
1203 .map(|i| i.collection_ids())
1204 .flatten()
1205 .chain(self.storage_state.exports.keys().copied())
1206 // Objects are considered stale if we did not see them re-created.
1207 .filter(|id| !expected_objects.contains(id))
1208 .collect::<Vec<_>>();
1209 let stale_oneshot_ingestions = self
1210 .storage_state
1211 .oneshot_ingestions
1212 .keys()
1213 .filter(|ingestion_id| {
1214 let created = create_oneshot_ingestions.contains(ingestion_id);
1215 let dropped = cancel_oneshot_ingestions.contains(ingestion_id);
1216 mz_ore::soft_assert_or_log!(
1217 !created && dropped,
1218 "dropped non-existent oneshot source"
1219 );
1220 !created && !dropped
1221 })
1222 .copied()
1223 .collect::<Vec<_>>();
1224
1225 info!(
1226 %worker_id, ?expected_objects, ?stale_objects, ?stale_oneshot_ingestions,
1227 "reconcile: modifing storage state to match expected objects",
1228 );
1229
1230 for id in stale_objects {
1231 self.storage_state.drop_collection(id);
1232 }
1233 for id in stale_oneshot_ingestions {
1234 self.storage_state.drop_oneshot_ingestion(id);
1235 }
1236
1237 // Do not report dropping any objects that do not belong to expected
1238 // objects.
1239 self.storage_state
1240 .dropped_ids
1241 .retain(|id| expected_objects.contains(id));
1242
1243 // Do not report any frontiers that do not belong to expected objects.
1244 // Note that this set of objects can differ from the set of sources and
1245 // sinks.
1246 self.storage_state
1247 .reported_frontiers
1248 .retain(|id, _| expected_objects.contains(id));
1249
1250 // Reset the reported frontiers for the remaining objects.
1251 for (_, frontier) in &mut self.storage_state.reported_frontiers {
1252 *frontier = Antichain::from_elem(<_>::minimum());
1253 }
1254
1255 // Reset the initial status reported flag when a new client connects
1256 self.storage_state.initial_status_reported = false;
1257
1258 // Execute the modified commands.
1259 for command in commands {
1260 self.storage_state.handle_storage_command(command);
1261 }
1262
1263 Ok(())
1264 }
1265}
1266
1267impl StorageState {
1268 /// Entry point for applying a storage command.
1269 ///
1270 /// NOTE: This does not have access to the timely worker and therefore
1271 /// cannot render dataflows. For dataflow rendering, this needs to either
1272 /// send asynchronous command to the `async_worker` or internal
1273 /// commands to the `internal_cmd_tx`.
1274 pub fn handle_storage_command(&mut self, cmd: StorageCommand) {
1275 match cmd {
1276 StorageCommand::CreateTimely { .. } => panic!("CreateTimely must be captured before"),
1277 StorageCommand::InitializationComplete => (),
1278 StorageCommand::AllowWrites => {
1279 self.read_only_tx
1280 .send(false)
1281 .expect("we're holding one other end");
1282 self.persist_clients.cfg().enable_compaction();
1283 }
1284 StorageCommand::UpdateConfiguration(params) => {
1285 // These can be done from all workers safely.
1286 tracing::info!("Applying configuration update: {params:?}");
1287
1288 // We serialize the dyncfg updates in StorageParameters, but configure
1289 // persist separately.
1290 self.persist_clients
1291 .cfg()
1292 .apply_from(¶ms.dyncfg_updates);
1293
1294 params.tracing.apply(self.tracing_handle.as_ref());
1295
1296 if let Some(log_filter) = ¶ms.tracing.log_filter {
1297 self.storage_configuration
1298 .connection_context
1299 .librdkafka_log_level =
1300 mz_ore::tracing::crate_level(&log_filter.clone().into(), "librdkafka");
1301 }
1302
1303 // This needs to be broadcast by one worker and go through
1304 // the internal command fabric, to ensure consistent
1305 // ordering of dataflow rendering across all workers.
1306 if self.timely_worker_index == 0 {
1307 self.internal_cmd_tx
1308 .send(InternalStorageCommand::UpdateConfiguration {
1309 storage_parameters: *params,
1310 })
1311 }
1312 }
1313 StorageCommand::RunIngestion(ingestion) => {
1314 let RunIngestionCommand { id, description } = *ingestion;
1315
1316 // Remember the ingestion description to facilitate possible
1317 // reconciliation later.
1318 self.ingestions.insert(id, description.clone());
1319
1320 // Initialize shared frontier reporting.
1321 for id in description.collection_ids() {
1322 self.reported_frontiers
1323 .entry(id)
1324 .or_insert_with(|| Antichain::from_elem(mz_repr::Timestamp::minimum()));
1325 }
1326
1327 // This needs to be done by one worker, which will broadcasts a
1328 // `CreateIngestionDataflow` command to all workers based on the response that
1329 // contains the resumption upper.
1330 //
1331 // Doing this separately on each worker could lead to differing resume_uppers
1332 // which might lead to all kinds of mayhem.
1333 //
1334 // n.b. the ingestion on each worker uses the description from worker 0––not the
1335 // ingestion in the local storage state. This is something we might have
1336 // interest in fixing in the future, e.g. materialize#19907
1337 if self.timely_worker_index == 0 {
1338 self.async_worker.update_frontiers(id, description);
1339 }
1340 }
1341 StorageCommand::RunOneshotIngestion(oneshot) => {
1342 if self.timely_worker_index == 0 {
1343 self.internal_cmd_tx
1344 .send(InternalStorageCommand::RunOneshotIngestion {
1345 ingestion_id: oneshot.ingestion_id,
1346 collection_id: oneshot.collection_id,
1347 collection_meta: oneshot.collection_meta,
1348 request: oneshot.request,
1349 });
1350 }
1351 }
1352 StorageCommand::CancelOneshotIngestion(id) => {
1353 self.drop_oneshot_ingestion(id);
1354 }
1355 StorageCommand::RunSink(export) => {
1356 // Remember the sink description to facilitate possible
1357 // reconciliation later.
1358 let prev = self.exports.insert(export.id, export.description.clone());
1359
1360 // New sink, add state.
1361 if prev.is_none() {
1362 self.reported_frontiers.insert(
1363 export.id,
1364 Antichain::from_elem(mz_repr::Timestamp::minimum()),
1365 );
1366 }
1367
1368 // This needs to be broadcast by one worker and go through the internal command
1369 // fabric, to ensure consistent ordering of dataflow rendering across all
1370 // workers.
1371 if self.timely_worker_index == 0 {
1372 self.internal_cmd_tx
1373 .send(InternalStorageCommand::RunSinkDataflow(
1374 export.id,
1375 export.description,
1376 ));
1377 }
1378 }
1379 StorageCommand::AllowCompaction(id, frontier) => {
1380 match self.exports.get_mut(&id) {
1381 Some(export_description) => {
1382 // Update our knowledge of the `as_of`, in case we need to internally
1383 // restart a sink in the future.
1384 export_description.as_of.clone_from(&frontier);
1385 }
1386 // reported_frontiers contains both ingestions and their
1387 // exports
1388 None if self.reported_frontiers.contains_key(&id) => (),
1389 None => {
1390 soft_panic_or_log!("AllowCompaction command for non-existent {id}");
1391 }
1392 }
1393
1394 if frontier.is_empty() {
1395 // Indicates that we may drop `id`, as there are no more valid times to read.
1396 self.drop_collection(id);
1397 }
1398 }
1399 }
1400 }
1401
1402 /// Drop the identified storage collection from the storage state.
1403 fn drop_collection(&mut self, id: GlobalId) {
1404 fail_point!("crash_on_drop");
1405
1406 self.ingestions.remove(&id);
1407 self.exports.remove(&id);
1408
1409 let _ = self.latest_status_updates.remove(&id);
1410
1411 // This will stop reporting of frontiers.
1412 //
1413 // If this object still has its frontiers reported, we will notify the
1414 // client envd of the drop.
1415 if self.reported_frontiers.remove(&id).is_some() {
1416 // The only actions left are internal cleanup, so we can commit to
1417 // the client that these objects have been dropped.
1418 //
1419 // This must be done now rather than in response to `DropDataflow`,
1420 // otherwise we introduce the possibility of a timing issue where:
1421 // - We remove all tracking state from the storage state and send
1422 // `DropDataflow` (i.e. this block).
1423 // - While waiting to process that command, we reconcile with a new
1424 // envd. That envd has already committed to its catalog that this
1425 // object no longer exists.
1426 // - We process the `DropDataflow` command, and identify that this
1427 // object has been dropped.
1428 // - The next time `dropped_ids` is processed, we send a response
1429 // that this ID has been dropped, but the upstream state has no
1430 // record of that object having ever existed.
1431 self.dropped_ids.push(id);
1432 }
1433
1434 // Send through async worker for correct ordering with RunIngestion, and
1435 // dropping the dataflow is done on async worker response.
1436 if self.timely_worker_index == 0 {
1437 self.async_worker.drop_dataflow(id);
1438 }
1439 }
1440
1441 /// Drop the identified oneshot ingestion from the storage state.
1442 fn drop_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) {
1443 let prev = self.oneshot_ingestions.remove(&ingestion_id);
1444 tracing::info!(%ingestion_id, existed = %prev.is_some(), "dropping oneshot ingestion");
1445 }
1446}