Skip to main content

mz_compute/
compute_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for compute timely instances.
7
8use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::implementations::BatchContainer;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32    ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33    PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
34};
35use mz_compute_types::plan::render_plan::RenderPlan;
36use mz_dyncfg::ConfigSet;
37use mz_expr::row::RowCollection;
38use mz_expr::{RowComparator, SafeMfpPlan};
39use mz_ore::cast::{CastFrom, CastLossy};
40use mz_ore::collections::CollectionExt;
41use mz_ore::metrics::{MetricsRegistry, UIntGauge};
42use mz_ore::now::EpochMillis;
43use mz_ore::soft_panic_or_log;
44use mz_ore::task::AbortOnDropHandle;
45use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
46use mz_persist_client::Diagnostics;
47use mz_persist_client::cache::PersistClientCache;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_types::PersistLocation;
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::fixed_length::ExtendDatums;
53use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
54use mz_storage_operators::stats::StatsCursor;
55use mz_storage_types::StorageDiff;
56use mz_storage_types::controller::CollectionMetadata;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::sources::SourceData;
59use mz_storage_types::time_dependence::TimeDependence;
60use mz_txn_wal::operator::TxnsContext;
61use mz_txn_wal::txn_cache::TxnsCache;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::worker::Worker as TimelyWorker;
66use tokio::sync::{oneshot, watch};
67use tracing::{Level, debug, error, info, span, trace, warn};
68use uuid::Uuid;
69
70use crate::arrangement::manager::{TraceBundle, TraceManager};
71use crate::logging;
72use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
73use crate::logging::initialize::LoggingTraces;
74use crate::metrics::{CollectionMetrics, WorkerMetrics};
75use crate::render::{LinearJoinSpec, StartSignal};
76use crate::server::{ComputeInstanceContext, ResponseSender};
77
78mod peek_result_iterator;
79mod peek_stash;
80
81/// Worker-local state that is maintained across dataflows.
82///
83/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
84/// done between data ingress and egress.
85pub struct ComputeState {
86    /// State kept for each installed compute collection.
87    ///
88    /// Each collection has exactly one frontier.
89    /// How the frontier is communicated depends on the collection type:
90    ///  * Frontiers of indexes are equal to the frontier of their corresponding traces in the
91    ///    `TraceManager`.
92    ///  * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
93    ///  * Subscribes report their frontiers through the `subscribe_response_buffer`.
94    pub collections: BTreeMap<GlobalId, CollectionState>,
95    /// The traces available for sharing across dataflows.
96    pub traces: TraceManager,
97    /// Shared buffer with SUBSCRIBE operator instances by which they can respond.
98    ///
99    /// The entries are pairs of sink identifier (to identify the subscribe instance)
100    /// and the response itself.
101    pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
102    /// Shared buffer with S3 oneshot operator instances by which they can respond.
103    ///
104    /// The entries are pairs of sink identifier (to identify the s3 oneshot instance)
105    /// and the response itself.
106    pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
107    /// Peek commands that are awaiting fulfillment.
108    pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
109    /// The persist location where we can stash large peek results.
110    pub peek_stash_persist_location: Option<PersistLocation>,
111    /// The logger, from Timely's logging framework, if logs are enabled.
112    pub compute_logger: Option<logging::compute::Logger>,
113    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
114    /// This is intentionally shared between workers.
115    pub persist_clients: Arc<PersistClientCache>,
116    /// Context necessary for rendering txn-wal operators.
117    pub txns_ctx: TxnsContext,
118    /// History of commands received by this workers and all its peers.
119    pub command_history: ComputeCommandHistory<UIntGauge>,
120    /// Max size in bytes of any result.
121    max_result_size: u64,
122    /// Specification for rendering linear joins.
123    pub linear_join_spec: LinearJoinSpec,
124    /// Metrics for this worker.
125    pub metrics: WorkerMetrics,
126    /// A process-global handle to tracing configuration.
127    tracing_handle: Arc<TracingHandle>,
128    /// Other configuration for compute
129    pub context: ComputeInstanceContext,
130    /// Per-worker dynamic configuration.
131    ///
132    /// This is separate from the process-global `ConfigSet` and contains config options that need
133    /// to be applied consistently with compute command order.
134    ///
135    /// For example, for options that influence dataflow rendering it is important that all workers
136    /// render the same dataflow with the same options. If these options were stored in a global
137    /// `ConfigSet`, we couldn't guarantee that all workers observe changes to them at the same
138    /// point in the stream of compute commands. Storing per-worker configuration ensures that
139    /// because each worker's configuration is only updated once that worker observes the
140    /// respective `UpdateConfiguration` command.
141    ///
142    /// Reference-counted to avoid cloning for `Context`.
143    pub worker_config: Rc<ConfigSet>,
144
145    /// The process-global metrics registry.
146    pub metrics_registry: MetricsRegistry,
147
148    /// The number of timely workers per process.
149    pub workers_per_process: usize,
150
151    /// Collections awaiting schedule instruction by the controller.
152    ///
153    /// Each entry stores a reference to a token that can be dropped to unsuspend the collection's
154    /// dataflow. Multiple collections can reference the same token if they are exported by the
155    /// same dataflow.
156    suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
157
158    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
159    /// perform maintenance with every `step_or_park` invocation.
160    pub server_maintenance_interval: Duration,
161
162    /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started.
163    ///
164    /// Used to compute `replica_expiration`.
165    pub init_system_time: EpochMillis,
166
167    /// The maximum time for which the replica is expected to live. If not empty, dataflows in the
168    /// replica can drop diffs associated with timestamps beyond the replica expiration.
169    /// The replica will panic if such dataflows are not dropped before the replica has expired.
170    pub replica_expiration: Antichain<Timestamp>,
171
172    /// The storage worker forwards its introspection logs to the compute worker.
173    pub storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
174}
175
176impl ComputeState {
177    /// Construct a new `ComputeState`.
178    pub fn new(
179        persist_clients: Arc<PersistClientCache>,
180        txns_ctx: TxnsContext,
181        metrics: WorkerMetrics,
182        tracing_handle: Arc<TracingHandle>,
183        context: ComputeInstanceContext,
184        metrics_registry: MetricsRegistry,
185        workers_per_process: usize,
186        storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
187    ) -> Self {
188        let traces = TraceManager::new(metrics.clone());
189        let command_history = ComputeCommandHistory::new(metrics.for_history());
190
191        Self {
192            collections: Default::default(),
193            traces,
194            subscribe_response_buffer: Default::default(),
195            copy_to_response_buffer: Default::default(),
196            pending_peeks: Default::default(),
197            peek_stash_persist_location: None,
198            compute_logger: None,
199            persist_clients,
200            txns_ctx,
201            command_history,
202            max_result_size: u64::MAX,
203            linear_join_spec: Default::default(),
204            metrics,
205            tracing_handle,
206            context,
207            worker_config: mz_dyncfgs::all_dyncfgs().into(),
208            metrics_registry,
209            workers_per_process,
210            suspended_collections: Default::default(),
211            server_maintenance_interval: Duration::ZERO,
212            init_system_time: mz_ore::now::SYSTEM_TIME(),
213            replica_expiration: Antichain::default(),
214            storage_log_reader,
215        }
216    }
217
218    /// Return a mutable reference to the identified collection.
219    ///
220    /// Panics if the collection doesn't exist.
221    pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
222        self.collections
223            .get_mut(&id)
224            .expect("collection must exist")
225    }
226
227    /// Construct a new frontier probe for the given input and add it to the state of the given
228    /// collections.
229    ///
230    /// The caller is responsible for attaching the returned probe handle to the respective
231    /// dataflow input stream.
232    pub fn input_probe_for(
233        &mut self,
234        input_id: GlobalId,
235        collection_ids: impl Iterator<Item = GlobalId>,
236    ) -> probe::Handle<Timestamp> {
237        let probe = probe::Handle::default();
238        for id in collection_ids {
239            if let Some(collection) = self.collections.get_mut(&id) {
240                collection.input_probes.insert(input_id, probe.clone());
241            }
242        }
243        probe
244    }
245
246    /// Apply the current `worker_config` to the compute state.
247    fn apply_worker_config(&mut self) {
248        use mz_compute_types::dyncfgs::*;
249
250        let config = &self.worker_config;
251
252        self.linear_join_spec = LinearJoinSpec::from_config(config);
253
254        if ENABLE_LGALLOC.get(config) {
255            if let Some(path) = &self.context.scratch_directory {
256                let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
257                let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
258                let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
259                let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
260                let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
261                info!(
262                    ?path,
263                    backgrund_interval=?interval,
264                    clear_bytes,
265                    eager_return,
266                    file_growth_dampener,
267                    local_buffer_bytes,
268                    "enabling lgalloc"
269                );
270                let background_worker_config = lgalloc::BackgroundWorkerConfig {
271                    interval,
272                    clear_bytes,
273                };
274                lgalloc::lgalloc_set_config(
275                    lgalloc::LgAlloc::new()
276                        .enable()
277                        .with_path(path.clone())
278                        .with_background_config(background_worker_config)
279                        .eager_return(eager_return)
280                        .file_growth_dampener(file_growth_dampener)
281                        .local_buffer_bytes(local_buffer_bytes),
282                );
283            } else {
284                debug!("not enabling lgalloc, scratch directory not specified");
285            }
286        } else {
287            info!("disabling lgalloc");
288            lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
289        }
290
291        // Pager backend selection follows scratch-directory availability:
292        // a scratch dir means the file backend; no scratch dir means swap.
293        // `set_scratch_dir` and `set_backend` are both idempotent, so calling
294        // on every `apply_worker_config` tick is safe. The pager module is
295        // only compiled on Unix targets (`mz_ore::pager` is `cfg(unix)`).
296        #[cfg(unix)]
297        if let Some(path) = &self.context.scratch_directory {
298            mz_ore::pager::set_scratch_dir(path.clone());
299            mz_ore::pager::set_backend(mz_ore::pager::Backend::File);
300        } else {
301            mz_ore::pager::set_backend(mz_ore::pager::Backend::Swap);
302        }
303
304        crate::memory_limiter::apply_limiter_config(config);
305
306        mz_ore::region::ENABLE_LGALLOC_REGION.store(
307            ENABLE_COLUMNATION_LGALLOC.get(config),
308            std::sync::atomic::Ordering::Relaxed,
309        );
310
311        // NB: arrangement dictionary compression is deliberately NOT applied here. Unlike the
312        // settings above, it is captured once at replica creation (see `handle_create_instance`
313        // and `InstanceConfig::arrangement_dictionary_compression`) and held fixed, so that
314        // flipping the flag does not retroactively change arrangements on existing replicas.
315
316        // Apply column-paged-batcher configuration. Routes through
317        // `apply_tiered_config`, which reuses a process-wide `TieredPolicy`
318        // singleton — operator-driven tunes mutate the existing atomics
319        // rather than installing a fresh policy with a fresh budget atomic
320        // that would orphan in-flight resident tickets.
321        //
322        // Backend selection mirrors the lower-level `mz_ore::pager`
323        // already configured above: file when a scratch directory is
324        // available, swap otherwise.
325        {
326            use mz_ore::pager::Backend;
327            use mz_timely_util::column_pager::{Codec, apply_tiered_config};
328
329            let enabled = ENABLE_COLUMN_PAGED_BATCHER_SPILL.get(config);
330            let codec = COLUMN_PAGED_BATCHER_LZ4.get(config).then_some(Codec::Lz4);
331            let swap_pageout = COLUMN_PAGED_BATCHER_SWAP_PAGEOUT.get(config);
332
333            // Budget derivation: fraction × announced memory limit, with a
334            // 128 MiB floor so the no-pressure case doesn't page per chunk.
335            // Falls back to a 4 GiB assumption if no limit was announced
336            // (e.g. dev environments).
337            const MIB: usize = 1024 * 1024;
338            const DEFAULT_MEM_LIMIT: usize = 4 * 1024 * MIB;
339            let mem_limit = crate::memory_limiter::get_memory_limit().unwrap_or(DEFAULT_MEM_LIMIT);
340            let fraction = COLUMN_PAGED_BATCHER_BUDGET_FRACTION.get(config).max(0.0);
341            let total = usize::cast_lossy(f64::cast_lossy(mem_limit) * fraction).max(128 * MIB);
342
343            let backend = if self.context.scratch_directory.is_some() {
344                Backend::File
345            } else {
346                Backend::Swap
347            };
348
349            debug!(
350                enabled,
351                ?backend,
352                ?codec,
353                swap_pageout,
354                fraction,
355                mem_limit,
356                budget_bytes = total,
357                "column-paged batcher: applying tiered config",
358            );
359            apply_tiered_config(enabled, total, backend, codec, swap_pageout);
360        }
361
362        // Remember the maintenance interval locally to avoid reading it from the config set on
363        // every server iteration.
364        self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
365
366        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
367        match overflowing_behavior.parse() {
368            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
369            Err(err) => {
370                error!(
371                    err,
372                    overflowing_behavior, "Invalid value for ore_overflowing_behavior"
373                );
374            }
375        }
376    }
377
378    /// Apply the provided replica expiration `offset` by converting it to a frontier relative to
379    /// the replica's initialization system time.
380    ///
381    /// Only expected to be called once when creating the instance. Guards against calling it
382    /// multiple times by checking if the local expiration time is set.
383    pub fn apply_expiration_offset(&mut self, offset: Duration) {
384        if self.replica_expiration.is_empty() {
385            let offset: EpochMillis = offset
386                .as_millis()
387                .try_into()
388                .expect("duration must fit within u64");
389            let replica_expiration_millis = self.init_system_time + offset;
390            let replica_expiration = Timestamp::from(replica_expiration_millis);
391
392            info!(
393                offset = %offset,
394                replica_expiration_millis = %replica_expiration_millis,
395                replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
396                "setting replica expiration",
397            );
398            self.replica_expiration = Antichain::from_elem(replica_expiration);
399
400            // Record the replica expiration in the metrics.
401            self.metrics
402                .replica_expiration_timestamp_seconds
403                .set(replica_expiration.into());
404        }
405    }
406
407    /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
408    /// appropriate to this replica.
409    pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
410        use mz_compute_types::dyncfgs::{
411            DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
412        };
413
414        if self.persist_clients.cfg.is_cc_active {
415            DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
416        } else {
417            DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
418        }
419    }
420}
421
422/// A wrapper around [ComputeState] with a live timely worker and response channel.
423pub(crate) struct ActiveComputeState<'a> {
424    /// The underlying Timely worker.
425    pub timely_worker: &'a mut TimelyWorker,
426    /// The compute state itself.
427    pub compute_state: &'a mut ComputeState,
428    /// The channel over which frontier information is reported.
429    pub response_tx: &'a mut ResponseSender,
430}
431
432/// A token that keeps a sink alive.
433pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
434
435impl SinkToken {
436    /// Create a new `SinkToken`.
437    pub fn new(t: Box<dyn Any>) -> Self {
438        Self(t)
439    }
440}
441
442impl<'a> ActiveComputeState<'a> {
443    /// Entrypoint for applying a compute command.
444    #[mz_ore::instrument(level = "debug")]
445    pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
446        use ComputeCommand::*;
447
448        self.compute_state.command_history.push(cmd.clone());
449
450        // Record the command duration, per worker and command kind.
451        let timer = self
452            .compute_state
453            .metrics
454            .handle_command_duration_seconds
455            .for_command(&cmd)
456            .start_timer();
457
458        match cmd {
459            Hello { .. } => panic!("Hello must be captured before"),
460            CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
461            InitializationComplete => (),
462            UpdateConfiguration(params) => self.handle_update_configuration(*params),
463            CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
464            Schedule(id) => self.handle_schedule(id),
465            AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
466            Peek(peek) => {
467                peek.otel_ctx.attach_as_parent();
468                self.handle_peek(*peek)
469            }
470            CancelPeek { uuid } => self.handle_cancel_peek(uuid),
471            AllowWrites(id) => {
472                self.handle_allow_writes(id);
473            }
474        }
475
476        timer.observe_duration();
477    }
478
479    fn handle_create_instance(&mut self, config: InstanceConfig) {
480        // Ensure the state is consistent with the config before we initialize anything.
481        self.compute_state.apply_worker_config();
482
483        // Apply dictionary compression exactly once, here at instance creation, from the value the
484        // controller captured when the replica was created. We deliberately do NOT re-apply it on
485        // `handle_update_configuration`, so flipping the flag does not retroactively change this
486        // replica's arrangements. `DICTIONARY_COMPRESSION` is process-global and a replica process
487        // hosts a single instance, so this single store covers all of the replica's arrangements.
488        mz_row_spine::DICTIONARY_COMPRESSION.store(
489            config.arrangement_dictionary_compression,
490            std::sync::atomic::Ordering::Relaxed,
491        );
492
493        if let Some(offset) = config.expiration_offset {
494            self.compute_state.apply_expiration_offset(offset);
495        }
496
497        let storage_log_reader = self.compute_state.storage_log_reader.take();
498        self.initialize_logging(config.logging, storage_log_reader);
499
500        self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
501    }
502
503    fn handle_update_configuration(&mut self, params: ComputeParameters) {
504        debug!("Applying configuration update: {params:?}");
505
506        let ComputeParameters {
507            workload_class,
508            max_result_size,
509            tracing,
510            grpc_client: _grpc_client,
511            dyncfg_updates,
512        } = params;
513
514        if let Some(v) = workload_class {
515            self.compute_state.metrics.set_workload_class(v);
516        }
517        if let Some(v) = max_result_size {
518            self.compute_state.max_result_size = v;
519        }
520
521        tracing.apply(self.compute_state.tracing_handle.as_ref());
522
523        dyncfg_updates.apply(&self.compute_state.worker_config);
524        self.compute_state
525            .persist_clients
526            .cfg()
527            .apply_from(&dyncfg_updates);
528
529        // Note: We're only updating mz_metrics from the compute state here, but not from the
530        // equivalent storage state. This is because they're running on the same process and
531        // share the metrics.
532        mz_metrics::update_dyncfg(&dyncfg_updates);
533
534        self.compute_state.apply_worker_config();
535    }
536
537    fn handle_create_dataflow(
538        &mut self,
539        dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
540    ) {
541        let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
542        let as_of = dataflow.as_of.clone().unwrap();
543
544        let dataflow_expiration = dataflow
545            .time_dependence
546            .as_ref()
547            .map(|time_dependence| {
548                self.determine_dataflow_expiration(time_dependence, &dataflow.until)
549            })
550            .unwrap_or_default();
551
552        // Add the dataflow expiration to `until`.
553        let until = dataflow.until.meet(&dataflow_expiration);
554
555        if dataflow.is_transient() {
556            debug!(
557                name = %dataflow.debug_name,
558                import_ids = %dataflow.display_import_ids(),
559                export_ids = %dataflow.display_export_ids(),
560                as_of = ?as_of.elements(),
561                time_dependence = ?dataflow.time_dependence,
562                expiration = ?dataflow_expiration.elements(),
563                expiration_datetime = ?dataflow_expiration
564                    .as_option()
565                    .map(|t| mz_ore::now::to_datetime(t.into())),
566                plan_until = ?dataflow.until.elements(),
567                until = ?until.elements(),
568                "creating dataflow",
569            );
570        } else {
571            info!(
572                name = %dataflow.debug_name,
573                import_ids = %dataflow.display_import_ids(),
574                export_ids = %dataflow.display_export_ids(),
575                as_of = ?as_of.elements(),
576                time_dependence = ?dataflow.time_dependence,
577                expiration = ?dataflow_expiration.elements(),
578                expiration_datetime = ?dataflow_expiration
579                    .as_option()
580                    .map(|t| mz_ore::now::to_datetime(t.into())),
581                plan_until = ?dataflow.until.elements(),
582                until = ?until.elements(),
583                "creating dataflow",
584            );
585        };
586
587        let subscribe_copy_ids: BTreeSet<_> = dataflow
588            .subscribe_ids()
589            .chain(dataflow.copy_to_ids())
590            .collect();
591
592        // Initialize compute and logging state for each object.
593        for object_id in dataflow.export_ids() {
594            let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
595            let metrics = self.compute_state.metrics.for_collection(object_id);
596            let mut collection = CollectionState::new(
597                Rc::clone(&dataflow_index),
598                is_subscribe_or_copy,
599                as_of.clone(),
600                metrics,
601            );
602
603            if let Some(logger) = self.compute_state.compute_logger.clone() {
604                let logging = CollectionLogging::new(
605                    object_id,
606                    logger,
607                    *dataflow_index,
608                    dataflow.import_ids(),
609                );
610                collection.logging = Some(logging);
611            }
612
613            collection.reset_reported_frontiers(ReportedFrontier::NotReported {
614                lower: as_of.clone(),
615            });
616
617            let existing = self.compute_state.collections.insert(object_id, collection);
618            if existing.is_some() {
619                error!(
620                    id = ?object_id,
621                    "existing collection for newly created dataflow",
622                );
623            }
624        }
625
626        let (start_signal, suspension_token) = StartSignal::new();
627        for id in dataflow.export_ids() {
628            self.compute_state
629                .suspended_collections
630                .insert(id, Rc::clone(&suspension_token));
631        }
632
633        crate::render::build_compute_dataflow(
634            self.timely_worker,
635            self.compute_state,
636            dataflow,
637            start_signal,
638            until,
639            dataflow_expiration,
640        );
641    }
642
643    fn handle_schedule(&mut self, id: GlobalId) {
644        // A `Schedule` command instructs us to begin dataflow computation for a collection, so
645        // we should unsuspend it by dropping the corresponding suspension token. Note that a
646        // dataflow can export multiple collections and they all share one suspension token, so the
647        // computation of a dataflow will only start once all its exported collections have been
648        // scheduled.
649        let suspension_token = self.compute_state.suspended_collections.remove(&id);
650        drop(suspension_token);
651    }
652
653    fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
654        if frontier.is_empty() {
655            // Indicates that we may drop `id`, as there are no more valid times to read.
656            self.drop_collection(id);
657        } else {
658            self.compute_state
659                .traces
660                .allow_compaction(id, frontier.borrow());
661        }
662    }
663
664    #[mz_ore::instrument(level = "debug")]
665    fn handle_peek(&mut self, peek: Peek) {
666        let pending = match &peek.target {
667            PeekTarget::Index { id } => {
668                // Acquire a copy of the trace suitable for fulfilling the peek.
669                let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
670                PendingPeek::index(peek, trace_bundle)
671            }
672            PeekTarget::Persist { metadata, .. } => {
673                let metadata = metadata.clone();
674                PendingPeek::persist(
675                    peek,
676                    Arc::clone(&self.compute_state.persist_clients),
677                    metadata,
678                    usize::cast_from(self.compute_state.max_result_size),
679                    self.timely_worker,
680                )
681            }
682        };
683
684        // Log the receipt of the peek.
685        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
686            logger.log(&pending.as_log_event(true));
687        }
688
689        self.process_peek(&mut Antichain::new(), pending);
690    }
691
692    fn handle_cancel_peek(&mut self, uuid: Uuid) {
693        if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
694            self.send_peek_response(peek, PeekResponse::Canceled);
695        }
696    }
697
698    fn handle_allow_writes(&mut self, id: GlobalId) {
699        // Enable persist compaction on any allow-writes command. We
700        // assume persist only compacts after making durable changes,
701        // such as appending a batch or advancing the upper.
702        self.compute_state.persist_clients.cfg().enable_compaction();
703
704        if let Some(collection) = self.compute_state.collections.get_mut(&id) {
705            collection.allow_writes();
706        } else {
707            soft_panic_or_log!("allow writes for unknown collection {id}");
708        }
709    }
710
711    /// Drop the given collection.
712    fn drop_collection(&mut self, id: GlobalId) {
713        let collection = self
714            .compute_state
715            .collections
716            .remove(&id)
717            .expect("dropped untracked collection");
718
719        // If this collection is an index, remove its trace.
720        self.compute_state.traces.remove(&id);
721        // If the collection is unscheduled, remove it from the list of waiting collections.
722        self.compute_state.suspended_collections.remove(&id);
723
724        // Drop the dataflow, if all its exports have been dropped.
725        if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
726            self.timely_worker.drop_dataflow(index);
727        }
728
729        // The compute protocol requires us to send a `Frontiers` response with empty frontiers
730        // when a collection was dropped, unless:
731        //  * The frontier was already reported as empty previously, or
732        //  * The collection is a subscribe or copy-to.
733        if !collection.is_subscribe_or_copy {
734            let reported = collection.reported_frontiers;
735            let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
736            let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
737            let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
738
739            let frontiers = FrontiersResponse {
740                write_frontier,
741                input_frontier,
742                output_frontier,
743            };
744            if frontiers.has_updates() {
745                self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
746            }
747        }
748    }
749
750    /// Initializes timely dataflow logging and publishes as a view.
751    pub fn initialize_logging(
752        &mut self,
753        config: LoggingConfig,
754        storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
755    ) {
756        if self.compute_state.compute_logger.is_some() {
757            panic!("dataflow server has already initialized logging");
758        }
759
760        let LoggingTraces {
761            traces,
762            dataflow_index,
763            compute_logger: logger,
764        } = logging::initialize(
765            self.timely_worker,
766            &config,
767            self.compute_state.metrics_registry.clone(),
768            Rc::clone(&self.compute_state.worker_config),
769            self.compute_state.workers_per_process,
770            storage_log_reader,
771        );
772
773        let dataflow_index = Rc::new(dataflow_index);
774        let mut log_index_ids = config.index_logs;
775        for (log, trace) in traces {
776            // Install trace as maintained index.
777            let id = log_index_ids
778                .remove(&log)
779                .expect("`logging::initialize` does not invent logs");
780            self.compute_state.traces.set(id, trace);
781
782            // Initialize compute and logging state for the logging index.
783            let is_subscribe_or_copy = false;
784            let as_of = Antichain::from_elem(Timestamp::MIN);
785            let metrics = self.compute_state.metrics.for_collection(id);
786            let mut collection = CollectionState::new(
787                Rc::clone(&dataflow_index),
788                is_subscribe_or_copy,
789                as_of,
790                metrics,
791            );
792
793            let logging =
794                CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
795            collection.logging = Some(logging);
796
797            let existing = self.compute_state.collections.insert(id, collection);
798            if existing.is_some() {
799                error!(
800                    id = ?id,
801                    "existing collection for newly initialized logging export",
802                );
803            }
804        }
805
806        // Sanity check.
807        assert!(
808            log_index_ids.is_empty(),
809            "failed to create requested logging indexes: {log_index_ids:?}",
810        );
811
812        self.compute_state.compute_logger = Some(logger);
813    }
814
815    /// Send progress information to the controller.
816    pub fn report_frontiers(&mut self) {
817        let mut responses = Vec::new();
818
819        // Maintain a single allocation for `new_frontier` to avoid allocating on every iteration.
820        let mut new_frontier = Antichain::new();
821
822        for (&id, collection) in self.compute_state.collections.iter_mut() {
823            // The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
824            // collections (database-issues#4701).
825            if collection.is_subscribe_or_copy {
826                continue;
827            }
828
829            let reported = collection.reported_frontiers();
830
831            // Collect the write frontier and check for progress.
832            new_frontier.clear();
833            if let Some(traces) = self.compute_state.traces.get_mut(&id) {
834                assert!(
835                    collection.sink_write_frontier.is_none(),
836                    "collection {id} has multiple frontiers"
837                );
838                traces.oks_mut().read_upper(&mut new_frontier);
839            } else if let Some(frontier) = &collection.sink_write_frontier {
840                new_frontier.clone_from(&frontier.borrow());
841            } else {
842                error!(id = ?id, "collection without write frontier");
843                continue;
844            }
845            let new_write_frontier = reported
846                .write_frontier
847                .allows_reporting(&new_frontier)
848                .then(|| new_frontier.clone());
849
850            // Collect the output frontier and check for progress.
851            //
852            // By default, the output frontier equals the write frontier (which is still stored in
853            // `new_frontier`). If the collection provides a compute frontier, we construct the
854            // output frontier by taking the meet of write and compute frontier, to avoid:
855            //  * reporting progress through times we have not yet written
856            //  * reporting progress through times we have not yet fully processed, for
857            //    collections that jump their write frontiers into the future
858            //
859            // As a special case, in read-only mode we don't take the write frontier into account.
860            // The dataflow doesn't have the ability to push it forward, so it can't be used as a
861            // measure of dataflow progress.
862            if let Some(probe) = &collection.compute_probe {
863                if *collection.read_only_rx.borrow() {
864                    new_frontier.clear();
865                }
866                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
867            }
868            let new_output_frontier = reported
869                .output_frontier
870                .allows_reporting(&new_frontier)
871                .then(|| new_frontier.clone());
872
873            // Collect the input frontier and check for progress.
874            new_frontier.clear();
875            for probe in collection.input_probes.values() {
876                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
877            }
878            let new_input_frontier = reported
879                .input_frontier
880                .allows_reporting(&new_frontier)
881                .then(|| new_frontier.clone());
882
883            if let Some(frontier) = &new_write_frontier {
884                collection
885                    .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
886            }
887            if let Some(frontier) = &new_input_frontier {
888                collection
889                    .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
890            }
891            if let Some(frontier) = &new_output_frontier {
892                collection
893                    .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
894            }
895
896            let response = FrontiersResponse {
897                write_frontier: new_write_frontier,
898                input_frontier: new_input_frontier,
899                output_frontier: new_output_frontier,
900            };
901            if response.has_updates() {
902                responses.push((id, response));
903            }
904        }
905
906        for (id, frontiers) in responses {
907            self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
908        }
909    }
910
911    /// Report per-worker metrics.
912    pub(crate) fn report_metrics(&self) {
913        if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
914            let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
915            let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
916            let remaining = expiration - now;
917            self.compute_state
918                .metrics
919                .replica_expiration_remaining_seconds
920                .set(remaining)
921        }
922    }
923
924    /// Either complete the peek (and send the response) or put it in the pending set.
925    fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
926        let response = match &mut peek {
927            PendingPeek::Index(peek) => {
928                let start = Instant::now();
929
930                let peek_stash_eligible = peek
931                    .peek
932                    .finishing
933                    .is_streamable(peek.peek.result_desc.arity());
934
935                let peek_stash_enabled = {
936                    let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
937                    let peek_persist_stash_available =
938                        self.compute_state.peek_stash_persist_location.is_some();
939                    if !peek_persist_stash_available && enabled {
940                        error!("missing peek_stash_persist_location but peek stash is enabled");
941                    }
942                    enabled && peek_persist_stash_available
943                };
944
945                let peek_stash_threshold_bytes =
946                    PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
947
948                let metrics = IndexPeekMetrics {
949                    seek_fulfillment_seconds: &self
950                        .compute_state
951                        .metrics
952                        .index_peek_seek_fulfillment_seconds,
953                    frontier_check_seconds: &self
954                        .compute_state
955                        .metrics
956                        .index_peek_frontier_check_seconds,
957                    error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
958                    cursor_setup_seconds: &self
959                        .compute_state
960                        .metrics
961                        .index_peek_cursor_setup_seconds,
962                    row_iteration_seconds: &self
963                        .compute_state
964                        .metrics
965                        .index_peek_row_iteration_seconds,
966                    result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
967                    row_collection_seconds: &self
968                        .compute_state
969                        .metrics
970                        .index_peek_row_collection_seconds,
971                };
972
973                let status = peek.seek_fulfillment(
974                    upper,
975                    self.compute_state.max_result_size,
976                    peek_stash_enabled && peek_stash_eligible,
977                    peek_stash_threshold_bytes,
978                    &metrics,
979                );
980
981                self.compute_state
982                    .metrics
983                    .index_peek_total_seconds
984                    .observe(start.elapsed().as_secs_f64());
985
986                match status {
987                    PeekStatus::Ready(result) => Some(result),
988                    PeekStatus::NotReady => None,
989                    PeekStatus::UsePeekStash => {
990                        let _span =
991                            span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
992
993                        let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
994                            .get(&self.compute_state.worker_config);
995
996                        let stash_task = peek_stash::StashingPeek::start_upload(
997                            Arc::clone(&self.compute_state.persist_clients),
998                            self.compute_state
999                                .peek_stash_persist_location
1000                                .as_ref()
1001                                .expect("verified above"),
1002                            peek.peek.clone(),
1003                            peek.trace_bundle.clone(),
1004                            peek_stash_batch_max_runs,
1005                        );
1006
1007                        self.compute_state
1008                            .pending_peeks
1009                            .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
1010                        return;
1011                    }
1012                }
1013            }
1014            PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
1015                self.compute_state
1016                    .metrics
1017                    .persist_peek_seconds
1018                    .observe(duration.as_secs_f64());
1019                result
1020            }),
1021            PendingPeek::Stash(stashing_peek) => {
1022                let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
1023                let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
1024                stashing_peek.pump_rows(num_batches, batch_size);
1025
1026                if let Ok((response, duration)) = stashing_peek.result.try_recv() {
1027                    self.compute_state
1028                        .metrics
1029                        .stashed_peek_seconds
1030                        .observe(duration.as_secs_f64());
1031                    trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
1032
1033                    Some(response)
1034                } else {
1035                    None
1036                }
1037            }
1038        };
1039
1040        if let Some(response) = response {
1041            let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
1042            self.send_peek_response(peek, response)
1043        } else {
1044            let uuid = peek.peek().uuid;
1045            self.compute_state.pending_peeks.insert(uuid, peek);
1046        }
1047    }
1048
1049    /// Scan pending peeks and attempt to retire each.
1050    pub fn process_peeks(&mut self) {
1051        let mut upper = Antichain::new();
1052        let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
1053        for (_uuid, peek) in pending_peeks {
1054            self.process_peek(&mut upper, peek);
1055        }
1056    }
1057
1058    /// Sends a response for this peek's resolution to the coordinator.
1059    ///
1060    /// Note that this function takes ownership of the `PendingPeek`, which is
1061    /// meant to prevent multiple responses to the same peek.
1062    #[mz_ore::instrument(level = "debug")]
1063    fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
1064        let log_event = peek.as_log_event(false);
1065        // Respond with the response.
1066        self.send_compute_response(ComputeResponse::PeekResponse(
1067            peek.peek().uuid,
1068            response,
1069            OpenTelemetryContext::obtain(),
1070        ));
1071
1072        // Log responding to the peek request.
1073        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
1074            logger.log(&log_event);
1075        }
1076    }
1077
1078    /// Scan the shared subscribe response buffer, and forward results along.
1079    pub fn process_subscribes(&mut self) {
1080        let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
1081        for (sink_id, mut response) in subscribe_responses.drain(..) {
1082            // Update frontier logging for this subscribe.
1083            if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1084                let new_frontier = match &response {
1085                    SubscribeResponse::Batch(b) => b.upper.clone(),
1086                    SubscribeResponse::DroppedAt(_) => Antichain::new(),
1087                };
1088
1089                let reported = collection.reported_frontiers();
1090                assert!(
1091                    reported.write_frontier.allows_reporting(&new_frontier),
1092                    "subscribe write frontier regression: {:?} -> {:?}",
1093                    reported.write_frontier,
1094                    new_frontier,
1095                );
1096                assert!(
1097                    reported.input_frontier.allows_reporting(&new_frontier),
1098                    "subscribe input frontier regression: {:?} -> {:?}",
1099                    reported.input_frontier,
1100                    new_frontier,
1101                );
1102
1103                collection
1104                    .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1105                collection
1106                    .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1107                collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1108            } else {
1109                // Presumably tracking state for this subscribe was already dropped by
1110                // `drop_collection`. There is nothing left to do for logging.
1111            }
1112
1113            response
1114                .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1115            self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1116        }
1117    }
1118
1119    /// Scan the shared copy to response buffer, and forward results along.
1120    pub fn process_copy_tos(&self) {
1121        let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1122        for (sink_id, response) in responses.drain(..) {
1123            self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1124        }
1125    }
1126
1127    /// Send a response to the coordinator.
1128    fn send_compute_response(&self, response: ComputeResponse) {
1129        // Ignore send errors because the coordinator is free to ignore our
1130        // responses. This happens during shutdown.
1131        let _ = self.response_tx.send(response);
1132    }
1133
1134    /// Checks for dataflow expiration. Panics if we're past the replica expiration time.
1135    pub(crate) fn check_expiration(&self) {
1136        let now = mz_ore::now::SYSTEM_TIME();
1137        if self.compute_state.replica_expiration.less_than(&now.into()) {
1138            let now_datetime = mz_ore::now::to_datetime(now);
1139            let expiration_datetime = self
1140                .compute_state
1141                .replica_expiration
1142                .as_option()
1143                .map(Into::into)
1144                .map(mz_ore::now::to_datetime);
1145
1146            // We error and assert separately to produce structured logs in anything that depends
1147            // on tracing.
1148            error!(
1149                now,
1150                now_datetime = ?now_datetime,
1151                expiration = ?self.compute_state.replica_expiration.elements(),
1152                expiration_datetime = ?expiration_datetime,
1153                "replica expired"
1154            );
1155
1156            // Repeat condition for better error message.
1157            assert!(
1158                !self.compute_state.replica_expiration.less_than(&now.into()),
1159                "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1160                self.compute_state.replica_expiration.elements(),
1161            );
1162        }
1163    }
1164
1165    /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be
1166    /// dropped.
1167    ///
1168    /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under
1169    /// which dataflow expiration should be disabled.
1170    pub fn determine_dataflow_expiration(
1171        &self,
1172        time_dependence: &TimeDependence,
1173        until: &Antichain<Timestamp>,
1174    ) -> Antichain<Timestamp> {
1175        // Evaluate time dependence with respect to the expiration time.
1176        // * Step time forward to ensure the expiration time is different to the moment a dataflow
1177        //   can legitimately jump to.
1178        // * We cannot expire dataflow with an until that is less or equal to the expiration time.
1179        let iter = self
1180            .compute_state
1181            .replica_expiration
1182            .iter()
1183            .filter_map(|t| time_dependence.apply(*t))
1184            .filter_map(|t| Timestamp::try_step_forward(&t))
1185            .filter(|expiration| !until.less_equal(expiration));
1186        Antichain::from_iter(iter)
1187    }
1188}
1189
1190/// A peek against either an index or a Persist collection.
1191///
1192/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1193/// as each `PendingPeek` is meant to be dropped after it's responded to.
1194pub enum PendingPeek {
1195    /// A peek against an index. (Possibly a temporary index created for the purpose.)
1196    Index(IndexPeek),
1197    /// A peek against a Persist-backed collection.
1198    Persist(PersistPeek),
1199    /// A peek against an index that is being stashed in the peek stash by an
1200    /// async background task.
1201    Stash(peek_stash::StashingPeek),
1202}
1203
1204impl PendingPeek {
1205    /// Produces a corresponding log event.
1206    pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1207        let peek = self.peek();
1208        let (id, peek_type) = match &peek.target {
1209            PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1210            PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1211        };
1212        let uuid = peek.uuid.into_bytes();
1213        ComputeEvent::Peek(PeekEvent {
1214            id,
1215            time: peek.timestamp,
1216            uuid,
1217            peek_type,
1218            installed,
1219        })
1220    }
1221
1222    fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1223        let empty_frontier = Antichain::new();
1224        let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1225        trace_bundle
1226            .oks_mut()
1227            .set_logical_compaction(timestamp_frontier.borrow());
1228        trace_bundle
1229            .errs_mut()
1230            .set_logical_compaction(timestamp_frontier.borrow());
1231        trace_bundle
1232            .oks_mut()
1233            .set_physical_compaction(empty_frontier.borrow());
1234        trace_bundle
1235            .errs_mut()
1236            .set_physical_compaction(empty_frontier.borrow());
1237
1238        PendingPeek::Index(IndexPeek {
1239            peek,
1240            trace_bundle,
1241            span: tracing::Span::current(),
1242        })
1243    }
1244
1245    fn persist(
1246        peek: Peek,
1247        persist_clients: Arc<PersistClientCache>,
1248        metadata: CollectionMetadata,
1249        max_result_size: usize,
1250        timely_worker: &TimelyWorker,
1251    ) -> Self {
1252        let active_worker = {
1253            // Choose the worker that does the actual peek arbitrarily but consistently.
1254            let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1255            chosen_index == timely_worker.index()
1256        };
1257        let activator = timely_worker.sync_activator_for([].into());
1258        let peek_uuid = peek.uuid;
1259
1260        let (result_tx, result_rx) = oneshot::channel();
1261        let timestamp = peek.timestamp;
1262        let mfp_plan = peek.map_filter_project.clone();
1263        let max_results_needed = peek
1264            .finishing
1265            .limit
1266            .map(|l| usize::cast_from(u64::from(l)))
1267            .unwrap_or(usize::MAX)
1268            + peek.finishing.offset;
1269        let order_by = peek.finishing.order_by.clone();
1270
1271        // Persist peeks can include at most one literal constraint.
1272        let literal_constraint = peek
1273            .literal_constraints
1274            .clone()
1275            .map(|rows| rows.into_element());
1276
1277        let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1278            let start = Instant::now();
1279            let result = if active_worker {
1280                PersistPeek::do_peek(
1281                    &persist_clients,
1282                    metadata,
1283                    timestamp,
1284                    literal_constraint,
1285                    mfp_plan,
1286                    max_result_size,
1287                    max_results_needed,
1288                )
1289                .await
1290            } else {
1291                Ok(vec![])
1292            };
1293            let result = match result {
1294                Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1295                Err(e) => PeekResponse::Error(e.to_string()),
1296            };
1297            match result_tx.send((result, start.elapsed())) {
1298                Ok(()) => {}
1299                Err((_result, elapsed)) => {
1300                    debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1301                }
1302            }
1303            match activator.activate() {
1304                Ok(()) => {}
1305                Err(_) => {
1306                    debug!("unable to wake timely after completed peek {peek_uuid}");
1307                }
1308            }
1309        });
1310        PendingPeek::Persist(PersistPeek {
1311            peek,
1312            _abort_handle: task_handle.abort_on_drop(),
1313            result: result_rx,
1314            span: tracing::Span::current(),
1315        })
1316    }
1317
1318    fn span(&self) -> &tracing::Span {
1319        match self {
1320            PendingPeek::Index(p) => &p.span,
1321            PendingPeek::Persist(p) => &p.span,
1322            PendingPeek::Stash(p) => &p.span,
1323        }
1324    }
1325
1326    pub(crate) fn peek(&self) -> &Peek {
1327        match self {
1328            PendingPeek::Index(p) => &p.peek,
1329            PendingPeek::Persist(p) => &p.peek,
1330            PendingPeek::Stash(p) => &p.peek,
1331        }
1332    }
1333}
1334
1335/// An in-progress Persist peek.
1336///
1337/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1338/// as each `PendingPeek` is meant to be dropped after it's responded to.
1339pub struct PersistPeek {
1340    pub(crate) peek: Peek,
1341    /// A background task that's responsible for producing the peek results.
1342    /// If we're no longer interested in the results, we abort the task.
1343    _abort_handle: AbortOnDropHandle<()>,
1344    /// The result of the background task, eventually.
1345    result: oneshot::Receiver<(PeekResponse, Duration)>,
1346    /// The `tracing::Span` tracking this peek's operation
1347    span: tracing::Span,
1348}
1349
1350impl PersistPeek {
1351    async fn do_peek(
1352        persist_clients: &PersistClientCache,
1353        metadata: CollectionMetadata,
1354        as_of: Timestamp,
1355        literal_constraint: Option<Row>,
1356        mfp_plan: SafeMfpPlan,
1357        max_result_size: usize,
1358        mut limit_remaining: usize,
1359    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1360        let client = persist_clients
1361            .open(metadata.persist_location)
1362            .await
1363            .map_err(|e| e.to_string())?;
1364
1365        let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1366            .open_leased_reader(
1367                metadata.data_shard,
1368                Arc::new(metadata.relation_desc.clone()),
1369                Arc::new(UnitSchema),
1370                Diagnostics::from_purpose("persist::peek"),
1371                USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1372            )
1373            .await
1374            .map_err(|e| e.to_string())?;
1375
1376        // If we are using txn-wal for this collection, then the upper might
1377        // be advanced lazily and we have to go through txn-wal for reads.
1378        //
1379        // TODO: If/when we have a process-wide TxnsRead worker for clusterd,
1380        // use in here (instead of opening a new TxnsCache) to save a persist
1381        // reader registration and some txns shard read traffic.
1382        let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1383            Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1384        } else {
1385            None
1386        };
1387
1388        let metrics = client.metrics();
1389
1390        let mut cursor = StatsCursor::new(
1391            &mut reader,
1392            txns_read.as_mut(),
1393            metrics,
1394            &mfp_plan,
1395            &metadata.relation_desc,
1396            Antichain::from_elem(as_of),
1397        )
1398        .await
1399        .map_err(|since| {
1400            format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1401        })?;
1402
1403        // Re-used state for processing and building rows.
1404        let mut result = vec![];
1405        let mut datum_vec = DatumVec::new();
1406        let mut row_builder = Row::default();
1407        let arena = RowArena::new();
1408        let mut total_size = 0usize;
1409
1410        let literal_len = match &literal_constraint {
1411            None => 0,
1412            Some(row) => row.iter().count(),
1413        };
1414
1415        'collect: while limit_remaining > 0 {
1416            let Some(batch) = cursor.next().await else {
1417                break;
1418            };
1419            for (data, _, d) in batch {
1420                let row = data.map_err(|e| e.to_string())?;
1421
1422                if let Some(literal) = &literal_constraint {
1423                    match row.iter().take(literal_len).cmp(literal.iter()) {
1424                        Ordering::Less => continue,
1425                        Ordering::Equal => {}
1426                        Ordering::Greater => break 'collect,
1427                    }
1428                }
1429
1430                let count: usize = d.try_into().map_err(|_| {
1431                    error!(
1432                        shard = %metadata.data_shard, diff = d, ?row,
1433                        "persist peek encountered negative multiplicities",
1434                    );
1435                    format!(
1436                        "Invalid data in source, \
1437                         saw retractions ({}) for row that does not exist: {:?}",
1438                        -d, row,
1439                    )
1440                })?;
1441                let Some(count) = NonZeroUsize::new(count) else {
1442                    continue;
1443                };
1444                let mut datum_local = datum_vec.borrow_with(&row);
1445                let eval_result = mfp_plan
1446                    .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1447                    .map(|row| row.cloned())
1448                    .map_err(|e| e.to_string())?;
1449                if let Some(row) = eval_result {
1450                    total_size = total_size
1451                        .saturating_add(row.byte_len())
1452                        .saturating_add(std::mem::size_of::<NonZeroUsize>());
1453                    if total_size > max_result_size {
1454                        return Err(format!(
1455                            "result exceeds max size of {}",
1456                            ByteSize::b(u64::cast_from(max_result_size))
1457                        ));
1458                    }
1459                    result.push((row, count));
1460                    limit_remaining = limit_remaining.saturating_sub(count.get());
1461                    if limit_remaining == 0 {
1462                        break;
1463                    }
1464                }
1465            }
1466        }
1467
1468        Ok(result)
1469    }
1470}
1471
1472/// An in-progress index-backed peek, and data to eventually fulfill it.
1473pub struct IndexPeek {
1474    peek: Peek,
1475    /// The data from which the trace derives.
1476    trace_bundle: TraceBundle,
1477    /// The `tracing::Span` tracking this peek's operation
1478    span: tracing::Span,
1479}
1480
1481/// Histogram metrics for index peek phases.
1482///
1483/// This struct bundles references to the various histogram metrics used to
1484/// instrument the index peek processing pipeline.
1485pub(crate) struct IndexPeekMetrics<'a> {
1486    pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1487    pub frontier_check_seconds: &'a prometheus::Histogram,
1488    pub error_scan_seconds: &'a prometheus::Histogram,
1489    pub cursor_setup_seconds: &'a prometheus::Histogram,
1490    pub row_iteration_seconds: &'a prometheus::Histogram,
1491    pub result_sort_seconds: &'a prometheus::Histogram,
1492    pub row_collection_seconds: &'a prometheus::Histogram,
1493}
1494
1495impl IndexPeek {
1496    /// Attempts to fulfill the peek and reports success.
1497    ///
1498    /// To produce output at `peek.timestamp`, we must be certain that
1499    /// it is no longer changing. A trace guarantees that all future
1500    /// changes will be greater than or equal to an element of `upper`.
1501    ///
1502    /// If an element of `upper` is less or equal to `peek.timestamp`,
1503    /// then there can be further updates that would change the output.
1504    /// If no element of `upper` is less or equal to `peek.timestamp`,
1505    /// then for any time `t` less or equal to `peek.timestamp` it is
1506    /// not the case that `upper` is less or equal to that timestamp,
1507    /// and so the result cannot further evolve.
1508    fn seek_fulfillment(
1509        &mut self,
1510        upper: &mut Antichain<Timestamp>,
1511        max_result_size: u64,
1512        peek_stash_eligible: bool,
1513        peek_stash_threshold_bytes: usize,
1514        metrics: &IndexPeekMetrics<'_>,
1515    ) -> PeekStatus {
1516        let method_start = Instant::now();
1517
1518        self.trace_bundle.oks_mut().read_upper(upper);
1519        if upper.less_equal(&self.peek.timestamp) {
1520            return PeekStatus::NotReady;
1521        }
1522        self.trace_bundle.errs_mut().read_upper(upper);
1523        if upper.less_equal(&self.peek.timestamp) {
1524            return PeekStatus::NotReady;
1525        }
1526
1527        let read_frontier = self.trace_bundle.compaction_frontier();
1528        if !read_frontier.less_equal(&self.peek.timestamp) {
1529            let error = format!(
1530                "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1531                read_frontier.elements(),
1532                self.peek.timestamp,
1533            );
1534            return PeekStatus::Ready(PeekResponse::Error(error));
1535        }
1536
1537        metrics
1538            .frontier_check_seconds
1539            .observe(method_start.elapsed().as_secs_f64());
1540
1541        let result = self.collect_finished_data(
1542            max_result_size,
1543            peek_stash_eligible,
1544            peek_stash_threshold_bytes,
1545            metrics,
1546        );
1547
1548        metrics
1549            .seek_fulfillment_seconds
1550            .observe(method_start.elapsed().as_secs_f64());
1551
1552        result
1553    }
1554
1555    /// Collects data for a known-complete peek from the ok stream.
1556    fn collect_finished_data(
1557        &mut self,
1558        max_result_size: u64,
1559        peek_stash_eligible: bool,
1560        peek_stash_threshold_bytes: usize,
1561        metrics: &IndexPeekMetrics<'_>,
1562    ) -> PeekStatus {
1563        let error_scan_start = Instant::now();
1564
1565        // Check if there exist any errors and, if so, return whatever one we
1566        // find first.
1567        let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1568        while cursor.key_valid(&storage) {
1569            let mut copies = Diff::ZERO;
1570            cursor.map_times(&storage, |time, diff| {
1571                if time.less_equal(&self.peek.timestamp) {
1572                    copies += diff;
1573                }
1574            });
1575            if copies.is_negative() {
1576                let error = cursor.key(&storage);
1577                error!(
1578                    target = %self.peek.target.id(), diff = %copies, %error,
1579                    "index peek encountered negative multiplicities in error trace",
1580                );
1581                return PeekStatus::Ready(PeekResponse::Error(format!(
1582                    "Invalid data in source errors, \
1583                    saw retractions ({}) for row that does not exist: {}",
1584                    -copies, error,
1585                )));
1586            }
1587            if copies.is_positive() {
1588                return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1589            }
1590            cursor.step_key(&storage);
1591        }
1592
1593        metrics
1594            .error_scan_seconds
1595            .observe(error_scan_start.elapsed().as_secs_f64());
1596
1597        Self::collect_ok_finished_data(
1598            &self.peek,
1599            self.trace_bundle.oks_mut(),
1600            max_result_size,
1601            peek_stash_eligible,
1602            peek_stash_threshold_bytes,
1603            metrics,
1604        )
1605    }
1606
1607    /// Collects data for a known-complete peek from the ok stream.
1608    fn collect_ok_finished_data<Tr>(
1609        peek: &Peek,
1610        oks_handle: &mut Tr,
1611        max_result_size: u64,
1612        peek_stash_eligible: bool,
1613        peek_stash_threshold_bytes: usize,
1614        metrics: &IndexPeekMetrics<'_>,
1615    ) -> PeekStatus
1616    where
1617        for<'a> Tr: TraceReader<
1618                Key<'a>: ExtendDatums + Eq,
1619                KeyContainer: BatchContainer<Owned = Row>,
1620                Val<'a>: ExtendDatums,
1621                TimeGat<'a>: PartialOrder<Timestamp>,
1622                DiffGat<'a> = &'a Diff,
1623            >,
1624    {
1625        let max_result_size = usize::cast_from(max_result_size);
1626        let count_byte_size = size_of::<NonZeroUsize>();
1627
1628        // Cursor setup timing
1629        let cursor_setup_start = Instant::now();
1630
1631        // We clone `literal_constraints` here because we don't want to move the constraints
1632        // out of the peek struct, and don't want to modify in-place.
1633        let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1634            peek.target.id().clone(),
1635            peek.map_filter_project.clone(),
1636            peek.timestamp,
1637            peek.literal_constraints.clone().as_deref_mut(),
1638            oks_handle,
1639        );
1640
1641        metrics
1642            .cursor_setup_seconds
1643            .observe(cursor_setup_start.elapsed().as_secs_f64());
1644
1645        // Accumulated `Vec<(row, count)>` results that we are likely to return.
1646        let mut results = Vec::new();
1647        let mut total_size: usize = 0;
1648
1649        // When set, a bound on the number of records we need to return.
1650        // The requirements on the records are driven by the finishing's
1651        // `order_by` field. Further limiting will happen when the results
1652        // are collected, so we don't need to have exactly this many results,
1653        // just at least those results that would have been returned.
1654        let max_results = peek.finishing.num_rows_needed();
1655
1656        let comparator = RowComparator::new(peek.finishing.order_by.as_slice());
1657
1658        // Row iteration timing
1659        let row_iteration_start = Instant::now();
1660        let mut sort_time_accum = Duration::ZERO;
1661
1662        while let Some(row) = peek_iterator.next() {
1663            let row: (Row, _) = match row {
1664                Ok(row) => row,
1665                Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1666            };
1667            let (row, copies) = row;
1668            let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1669
1670            total_size = total_size
1671                .saturating_add(row.byte_len())
1672                .saturating_add(count_byte_size);
1673            if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1674                return PeekStatus::UsePeekStash;
1675            }
1676            if total_size > max_result_size {
1677                return PeekStatus::Ready(PeekResponse::Error(format!(
1678                    "result exceeds max size of {}",
1679                    ByteSize::b(u64::cast_from(max_result_size))
1680                )));
1681            }
1682
1683            results.push((row, copies));
1684
1685            // If we hold many more than `max_results` records, we can thin down
1686            // `results` using `self.finishing.ordering`.
1687            if let Some(max_results) = max_results {
1688                // We use a threshold twice what we intend, to amortize the work
1689                // across all of the insertions. We could tighten this, but it
1690                // works for the moment.
1691                if results.len() >= 2 * max_results {
1692                    if peek.finishing.order_by.is_empty() {
1693                        results.truncate(max_results);
1694                        metrics
1695                            .row_iteration_seconds
1696                            .observe(row_iteration_start.elapsed().as_secs_f64());
1697                        metrics
1698                            .result_sort_seconds
1699                            .observe(sort_time_accum.as_secs_f64());
1700                        let row_collection_start = Instant::now();
1701                        let collection = RowCollection::new(results, &peek.finishing.order_by);
1702                        metrics
1703                            .row_collection_seconds
1704                            .observe(row_collection_start.elapsed().as_secs_f64());
1705                        return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1706                    } else {
1707                        // We can sort `results` and then truncate to `max_results`.
1708                        // This has an effect similar to a priority queue, without
1709                        // its interactive dequeueing properties.
1710                        // TODO: Had we left these as `Vec<Datum>` we would avoid
1711                        // the unpacking; we should consider doing that, although
1712                        // it will require a re-pivot of the code to branch on this
1713                        // inner test (as we prefer not to maintain `Vec<Datum>`
1714                        // in the other case).
1715                        let sort_start = Instant::now();
1716                        results.sort_by(|left, right| {
1717                            comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0))
1718                        });
1719                        sort_time_accum += sort_start.elapsed();
1720                        let dropped = results.drain(max_results..);
1721                        let dropped_size =
1722                            dropped
1723                                .into_iter()
1724                                .fold(0, |acc: usize, (row, _count): (Row, _)| {
1725                                    acc.saturating_add(
1726                                        row.byte_len().saturating_add(count_byte_size),
1727                                    )
1728                                });
1729                        total_size = total_size.saturating_sub(dropped_size);
1730                    }
1731                }
1732            }
1733        }
1734
1735        metrics
1736            .row_iteration_seconds
1737            .observe(row_iteration_start.elapsed().as_secs_f64());
1738        metrics
1739            .result_sort_seconds
1740            .observe(sort_time_accum.as_secs_f64());
1741
1742        let row_collection_start = Instant::now();
1743        let collection = RowCollection::new(results, &peek.finishing.order_by);
1744        metrics
1745            .row_collection_seconds
1746            .observe(row_collection_start.elapsed().as_secs_f64());
1747        PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1748    }
1749}
1750
1751/// For keeping track of the state of pending or ready peeks, and managing
1752/// control flow.
1753enum PeekStatus {
1754    /// The frontiers of objects are not yet advanced enough, peek is still
1755    /// pending.
1756    NotReady,
1757    /// The result size is above the configured threshold and the peek is
1758    /// eligible for using the peek result stash.
1759    UsePeekStash,
1760    /// The peek result is ready.
1761    Ready(PeekResponse),
1762}
1763
1764/// The frontiers we have reported to the controller for a collection.
1765#[derive(Debug)]
1766struct ReportedFrontiers {
1767    /// The reported write frontier.
1768    write_frontier: ReportedFrontier,
1769    /// The reported input frontier.
1770    input_frontier: ReportedFrontier,
1771    /// The reported output frontier.
1772    output_frontier: ReportedFrontier,
1773}
1774
1775impl ReportedFrontiers {
1776    /// Creates a new `ReportedFrontiers` instance.
1777    fn new() -> Self {
1778        Self {
1779            write_frontier: ReportedFrontier::new(),
1780            input_frontier: ReportedFrontier::new(),
1781            output_frontier: ReportedFrontier::new(),
1782        }
1783    }
1784}
1785
1786/// A frontier we have reported to the controller, or the least frontier we are allowed to report.
1787#[derive(Clone, Debug)]
1788pub enum ReportedFrontier {
1789    /// A frontier has been previously reported.
1790    Reported(Antichain<Timestamp>),
1791    /// No frontier has been reported yet.
1792    NotReported {
1793        /// A lower bound for frontiers that may be reported in the future.
1794        lower: Antichain<Timestamp>,
1795    },
1796}
1797
1798impl ReportedFrontier {
1799    /// Create a new `ReportedFrontier` enforcing the minimum lower bound.
1800    pub fn new() -> Self {
1801        let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1802        Self::NotReported { lower }
1803    }
1804
1805    /// Whether the reported frontier is the empty frontier.
1806    pub fn is_empty(&self) -> bool {
1807        match self {
1808            Self::Reported(frontier) => frontier.is_empty(),
1809            Self::NotReported { .. } => false,
1810        }
1811    }
1812
1813    /// Whether this `ReportedFrontier` allows reporting the given frontier.
1814    ///
1815    /// A `ReportedFrontier` allows reporting of another frontier if:
1816    ///  * The other frontier is greater than the reported frontier.
1817    ///  * The other frontier is greater than or equal to the lower bound.
1818    fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1819        match self {
1820            Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1821            Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1822        }
1823    }
1824}
1825
1826/// State maintained for a compute collection.
1827pub struct CollectionState {
1828    /// Tracks the frontiers that have been reported to the controller.
1829    reported_frontiers: ReportedFrontiers,
1830    /// The index of the dataflow computing this collection.
1831    ///
1832    /// Used for dropping the dataflow when the collection is dropped.
1833    /// The Dataflow index is wrapped in an `Rc`s and can be shared between collections, to reflect
1834    /// the possibility that a single dataflow can export multiple collections.
1835    dataflow_index: Rc<usize>,
1836    /// Whether this collection is a subscribe or copy-to.
1837    ///
1838    /// The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
1839    /// collections, so we need to be able to recognize them. This is something we would like to
1840    /// change in the future (database-issues#4701).
1841    pub is_subscribe_or_copy: bool,
1842    /// The collection's initial as-of frontier.
1843    ///
1844    /// Used to determine hydration status.
1845    as_of: Antichain<Timestamp>,
1846
1847    /// A token that should be dropped when this collection is dropped to clean up associated
1848    /// sink state.
1849    ///
1850    /// Only `Some` if the collection is a sink.
1851    pub sink_token: Option<SinkToken>,
1852    /// Frontier of sink writes.
1853    ///
1854    /// Only `Some` if the collection is a sink and *not* a subscribe.
1855    pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1856    /// Frontier probes for every input to the collection.
1857    pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1858    /// A probe reporting the frontier of times through which all collection outputs have been
1859    /// computed (but not necessarily written).
1860    ///
1861    /// `None` for collections with compute frontiers equal to their write frontiers.
1862    pub compute_probe: Option<probe::Handle<Timestamp>>,
1863    /// Logging state maintained for this collection.
1864    logging: Option<CollectionLogging>,
1865    /// Metrics tracked for this collection.
1866    metrics: CollectionMetrics,
1867    /// Send-side to transition a dataflow from read-only mode to read-write mode.
1868    ///
1869    /// All dataflows start in read-only mode. Only after receiving a
1870    /// `AllowWrites` command from the controller will they transition to
1871    /// read-write mode.
1872    ///
1873    /// A dataflow in read-only mode must not affect any external state.
1874    ///
1875    /// NOTE: In the future, we might want a more complicated flag, for example
1876    /// something that tells us after which timestamp we are allowed to write.
1877    /// In this first version we are keeping things as simple as possible!
1878    read_only_tx: watch::Sender<bool>,
1879    /// Receive-side to observe whether a dataflow is in read-only mode.
1880    pub read_only_rx: watch::Receiver<bool>,
1881}
1882
1883impl CollectionState {
1884    fn new(
1885        dataflow_index: Rc<usize>,
1886        is_subscribe_or_copy: bool,
1887        as_of: Antichain<Timestamp>,
1888        metrics: CollectionMetrics,
1889    ) -> Self {
1890        // We always initialize as read_only=true. Only when we're explicitly
1891        // allowed to we switch to read-write.
1892        let (read_only_tx, read_only_rx) = watch::channel(true);
1893
1894        Self {
1895            reported_frontiers: ReportedFrontiers::new(),
1896            dataflow_index,
1897            is_subscribe_or_copy,
1898            as_of,
1899            sink_token: None,
1900            sink_write_frontier: None,
1901            input_probes: Default::default(),
1902            compute_probe: None,
1903            logging: None,
1904            metrics,
1905            read_only_tx,
1906            read_only_rx,
1907        }
1908    }
1909
1910    /// Return the frontiers that have been reported to the controller.
1911    fn reported_frontiers(&self) -> &ReportedFrontiers {
1912        &self.reported_frontiers
1913    }
1914
1915    /// Reset all reported frontiers to the given value.
1916    pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1917        self.reported_frontiers.write_frontier = frontier.clone();
1918        self.reported_frontiers.input_frontier = frontier.clone();
1919        self.reported_frontiers.output_frontier = frontier;
1920    }
1921
1922    /// Set the write frontier that has been reported to the controller.
1923    fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1924        if let Some(logging) = &mut self.logging {
1925            let time = match &frontier {
1926                ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1927                ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1928            };
1929            logging.set_frontier(time);
1930        }
1931
1932        self.reported_frontiers.write_frontier = frontier;
1933    }
1934
1935    /// Set the input frontier that has been reported to the controller.
1936    fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1937        // Use this opportunity to update our input frontier logging.
1938        if let Some(logging) = &mut self.logging {
1939            for (id, probe) in &self.input_probes {
1940                let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1941                logging.set_import_frontier(*id, new_time);
1942            }
1943        }
1944
1945        self.reported_frontiers.input_frontier = frontier;
1946    }
1947
1948    /// Set the output frontier that has been reported to the controller.
1949    fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1950        let already_hydrated = self.hydrated();
1951
1952        self.reported_frontiers.output_frontier = frontier;
1953
1954        if !already_hydrated && self.hydrated() {
1955            if let Some(logging) = &mut self.logging {
1956                logging.set_hydrated();
1957            }
1958            self.metrics.record_collection_hydrated();
1959        }
1960    }
1961
1962    /// Return whether this collection is hydrated.
1963    fn hydrated(&self) -> bool {
1964        match &self.reported_frontiers.output_frontier {
1965            ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1966            ReportedFrontier::NotReported { .. } => false,
1967        }
1968    }
1969
1970    /// Allow writes for this collection.
1971    fn allow_writes(&self) {
1972        info!(
1973            dataflow_index = *self.dataflow_index,
1974            export = ?self.logging.as_ref().map(|l| l.export_id()),
1975            "allowing writes for dataflow",
1976        );
1977        let _ = self.read_only_tx.send(false);
1978    }
1979}