Skip to main content

mz_compute/
compute_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for compute timely instances.
7
8use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::implementations::BatchContainer;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32    ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33    PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
34};
35use mz_compute_types::plan::render_plan::RenderPlan;
36use mz_dyncfg::ConfigSet;
37use mz_expr::row::RowCollection;
38use mz_expr::{RowComparator, SafeMfpPlan};
39use mz_ore::cast::CastFrom;
40use mz_ore::collections::CollectionExt;
41use mz_ore::metrics::{MetricsRegistry, UIntGauge};
42use mz_ore::now::EpochMillis;
43use mz_ore::soft_panic_or_log;
44use mz_ore::task::AbortOnDropHandle;
45use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
46use mz_persist_client::Diagnostics;
47use mz_persist_client::cache::PersistClientCache;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_types::PersistLocation;
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::fixed_length::ToDatumIter;
53use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
54use mz_storage_operators::stats::StatsCursor;
55use mz_storage_types::StorageDiff;
56use mz_storage_types::controller::CollectionMetadata;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::sources::SourceData;
59use mz_storage_types::time_dependence::TimeDependence;
60use mz_txn_wal::operator::TxnsContext;
61use mz_txn_wal::txn_cache::TxnsCache;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::worker::Worker as TimelyWorker;
66use tokio::sync::{oneshot, watch};
67use tracing::{Level, debug, error, info, span, trace, warn};
68use uuid::Uuid;
69
70use crate::arrangement::manager::{TraceBundle, TraceManager};
71use crate::logging;
72use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
73use crate::logging::initialize::LoggingTraces;
74use crate::metrics::{CollectionMetrics, WorkerMetrics};
75use crate::render::{LinearJoinSpec, StartSignal};
76use crate::server::{ComputeInstanceContext, ResponseSender};
77
78mod peek_result_iterator;
79mod peek_stash;
80
81/// Worker-local state that is maintained across dataflows.
82///
83/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
84/// done between data ingress and egress.
85pub struct ComputeState {
86    /// State kept for each installed compute collection.
87    ///
88    /// Each collection has exactly one frontier.
89    /// How the frontier is communicated depends on the collection type:
90    ///  * Frontiers of indexes are equal to the frontier of their corresponding traces in the
91    ///    `TraceManager`.
92    ///  * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
93    ///  * Subscribes report their frontiers through the `subscribe_response_buffer`.
94    pub collections: BTreeMap<GlobalId, CollectionState>,
95    /// The traces available for sharing across dataflows.
96    pub traces: TraceManager,
97    /// Shared buffer with SUBSCRIBE operator instances by which they can respond.
98    ///
99    /// The entries are pairs of sink identifier (to identify the subscribe instance)
100    /// and the response itself.
101    pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
102    /// Shared buffer with S3 oneshot operator instances by which they can respond.
103    ///
104    /// The entries are pairs of sink identifier (to identify the s3 oneshot instance)
105    /// and the response itself.
106    pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
107    /// Peek commands that are awaiting fulfillment.
108    pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
109    /// The persist location where we can stash large peek results.
110    pub peek_stash_persist_location: Option<PersistLocation>,
111    /// The logger, from Timely's logging framework, if logs are enabled.
112    pub compute_logger: Option<logging::compute::Logger>,
113    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
114    /// This is intentionally shared between workers.
115    pub persist_clients: Arc<PersistClientCache>,
116    /// Context necessary for rendering txn-wal operators.
117    pub txns_ctx: TxnsContext,
118    /// History of commands received by this workers and all its peers.
119    pub command_history: ComputeCommandHistory<UIntGauge>,
120    /// Max size in bytes of any result.
121    max_result_size: u64,
122    /// Specification for rendering linear joins.
123    pub linear_join_spec: LinearJoinSpec,
124    /// Metrics for this worker.
125    pub metrics: WorkerMetrics,
126    /// A process-global handle to tracing configuration.
127    tracing_handle: Arc<TracingHandle>,
128    /// Other configuration for compute
129    pub context: ComputeInstanceContext,
130    /// Per-worker dynamic configuration.
131    ///
132    /// This is separate from the process-global `ConfigSet` and contains config options that need
133    /// to be applied consistently with compute command order.
134    ///
135    /// For example, for options that influence dataflow rendering it is important that all workers
136    /// render the same dataflow with the same options. If these options were stored in a global
137    /// `ConfigSet`, we couldn't guarantee that all workers observe changes to them at the same
138    /// point in the stream of compute commands. Storing per-worker configuration ensures that
139    /// because each worker's configuration is only updated once that worker observes the
140    /// respective `UpdateConfiguration` command.
141    ///
142    /// Reference-counted to avoid cloning for `Context`.
143    pub worker_config: Rc<ConfigSet>,
144
145    /// The process-global metrics registry.
146    pub metrics_registry: MetricsRegistry,
147
148    /// The number of timely workers per process.
149    pub workers_per_process: usize,
150
151    /// Collections awaiting schedule instruction by the controller.
152    ///
153    /// Each entry stores a reference to a token that can be dropped to unsuspend the collection's
154    /// dataflow. Multiple collections can reference the same token if they are exported by the
155    /// same dataflow.
156    suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
157
158    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
159    /// perform maintenance with every `step_or_park` invocation.
160    pub server_maintenance_interval: Duration,
161
162    /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started.
163    ///
164    /// Used to compute `replica_expiration`.
165    pub init_system_time: EpochMillis,
166
167    /// The maximum time for which the replica is expected to live. If not empty, dataflows in the
168    /// replica can drop diffs associated with timestamps beyond the replica expiration.
169    /// The replica will panic if such dataflows are not dropped before the replica has expired.
170    pub replica_expiration: Antichain<Timestamp>,
171}
172
173impl ComputeState {
174    /// Construct a new `ComputeState`.
175    pub fn new(
176        persist_clients: Arc<PersistClientCache>,
177        txns_ctx: TxnsContext,
178        metrics: WorkerMetrics,
179        tracing_handle: Arc<TracingHandle>,
180        context: ComputeInstanceContext,
181        metrics_registry: MetricsRegistry,
182        workers_per_process: usize,
183    ) -> Self {
184        let traces = TraceManager::new(metrics.clone());
185        let command_history = ComputeCommandHistory::new(metrics.for_history());
186
187        Self {
188            collections: Default::default(),
189            traces,
190            subscribe_response_buffer: Default::default(),
191            copy_to_response_buffer: Default::default(),
192            pending_peeks: Default::default(),
193            peek_stash_persist_location: None,
194            compute_logger: None,
195            persist_clients,
196            txns_ctx,
197            command_history,
198            max_result_size: u64::MAX,
199            linear_join_spec: Default::default(),
200            metrics,
201            tracing_handle,
202            context,
203            worker_config: mz_dyncfgs::all_dyncfgs().into(),
204            metrics_registry,
205            workers_per_process,
206            suspended_collections: Default::default(),
207            server_maintenance_interval: Duration::ZERO,
208            init_system_time: mz_ore::now::SYSTEM_TIME(),
209            replica_expiration: Antichain::default(),
210        }
211    }
212
213    /// Return a mutable reference to the identified collection.
214    ///
215    /// Panics if the collection doesn't exist.
216    pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
217        self.collections
218            .get_mut(&id)
219            .expect("collection must exist")
220    }
221
222    /// Construct a new frontier probe for the given input and add it to the state of the given
223    /// collections.
224    ///
225    /// The caller is responsible for attaching the returned probe handle to the respective
226    /// dataflow input stream.
227    pub fn input_probe_for(
228        &mut self,
229        input_id: GlobalId,
230        collection_ids: impl Iterator<Item = GlobalId>,
231    ) -> probe::Handle<Timestamp> {
232        let probe = probe::Handle::default();
233        for id in collection_ids {
234            if let Some(collection) = self.collections.get_mut(&id) {
235                collection.input_probes.insert(input_id, probe.clone());
236            }
237        }
238        probe
239    }
240
241    /// Apply the current `worker_config` to the compute state.
242    fn apply_worker_config(&mut self) {
243        use mz_compute_types::dyncfgs::*;
244
245        let config = &self.worker_config;
246
247        self.linear_join_spec = LinearJoinSpec::from_config(config);
248
249        if ENABLE_LGALLOC.get(config) {
250            if let Some(path) = &self.context.scratch_directory {
251                let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
252                let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
253                let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
254                let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
255                let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
256                info!(
257                    ?path,
258                    backgrund_interval=?interval,
259                    clear_bytes,
260                    eager_return,
261                    file_growth_dampener,
262                    local_buffer_bytes,
263                    "enabling lgalloc"
264                );
265                let background_worker_config = lgalloc::BackgroundWorkerConfig {
266                    interval,
267                    clear_bytes,
268                };
269                lgalloc::lgalloc_set_config(
270                    lgalloc::LgAlloc::new()
271                        .enable()
272                        .with_path(path.clone())
273                        .with_background_config(background_worker_config)
274                        .eager_return(eager_return)
275                        .file_growth_dampener(file_growth_dampener)
276                        .local_buffer_bytes(local_buffer_bytes),
277                );
278            } else {
279                debug!("not enabling lgalloc, scratch directory not specified");
280            }
281        } else {
282            info!("disabling lgalloc");
283            lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
284        }
285
286        crate::memory_limiter::apply_limiter_config(config);
287
288        mz_ore::region::ENABLE_LGALLOC_REGION.store(
289            ENABLE_COLUMNATION_LGALLOC.get(config),
290            std::sync::atomic::Ordering::Relaxed,
291        );
292
293        let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
294        mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
295
296        // Remember the maintenance interval locally to avoid reading it from the config set on
297        // every server iteration.
298        self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
299
300        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
301        match overflowing_behavior.parse() {
302            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
303            Err(err) => {
304                error!(
305                    err,
306                    overflowing_behavior, "Invalid value for ore_overflowing_behavior"
307                );
308            }
309        }
310    }
311
312    /// Apply the provided replica expiration `offset` by converting it to a frontier relative to
313    /// the replica's initialization system time.
314    ///
315    /// Only expected to be called once when creating the instance. Guards against calling it
316    /// multiple times by checking if the local expiration time is set.
317    pub fn apply_expiration_offset(&mut self, offset: Duration) {
318        if self.replica_expiration.is_empty() {
319            let offset: EpochMillis = offset
320                .as_millis()
321                .try_into()
322                .expect("duration must fit within u64");
323            let replica_expiration_millis = self.init_system_time + offset;
324            let replica_expiration = Timestamp::from(replica_expiration_millis);
325
326            info!(
327                offset = %offset,
328                replica_expiration_millis = %replica_expiration_millis,
329                replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
330                "setting replica expiration",
331            );
332            self.replica_expiration = Antichain::from_elem(replica_expiration);
333
334            // Record the replica expiration in the metrics.
335            self.metrics
336                .replica_expiration_timestamp_seconds
337                .set(replica_expiration.into());
338        }
339    }
340
341    /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
342    /// appropriate to this replica.
343    pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
344        use mz_compute_types::dyncfgs::{
345            DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
346        };
347
348        if self.persist_clients.cfg.is_cc_active {
349            DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
350        } else {
351            DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
352        }
353    }
354}
355
356/// A wrapper around [ComputeState] with a live timely worker and response channel.
357pub(crate) struct ActiveComputeState<'a> {
358    /// The underlying Timely worker.
359    pub timely_worker: &'a mut TimelyWorker,
360    /// The compute state itself.
361    pub compute_state: &'a mut ComputeState,
362    /// The channel over which frontier information is reported.
363    pub response_tx: &'a mut ResponseSender,
364}
365
366/// A token that keeps a sink alive.
367pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
368
369impl SinkToken {
370    /// Create a new `SinkToken`.
371    pub fn new(t: Box<dyn Any>) -> Self {
372        Self(t)
373    }
374}
375
376impl<'a> ActiveComputeState<'a> {
377    /// Entrypoint for applying a compute command.
378    #[mz_ore::instrument(level = "debug")]
379    pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
380        use ComputeCommand::*;
381
382        self.compute_state.command_history.push(cmd.clone());
383
384        // Record the command duration, per worker and command kind.
385        let timer = self
386            .compute_state
387            .metrics
388            .handle_command_duration_seconds
389            .for_command(&cmd)
390            .start_timer();
391
392        match cmd {
393            Hello { .. } => panic!("Hello must be captured before"),
394            CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
395            InitializationComplete => (),
396            UpdateConfiguration(params) => self.handle_update_configuration(*params),
397            CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
398            Schedule(id) => self.handle_schedule(id),
399            AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
400            Peek(peek) => {
401                peek.otel_ctx.attach_as_parent();
402                self.handle_peek(*peek)
403            }
404            CancelPeek { uuid } => self.handle_cancel_peek(uuid),
405            AllowWrites(id) => {
406                self.handle_allow_writes(id);
407            }
408        }
409
410        timer.observe_duration();
411    }
412
413    fn handle_create_instance(&mut self, config: InstanceConfig) {
414        // Ensure the state is consistent with the config before we initialize anything.
415        self.compute_state.apply_worker_config();
416        if let Some(offset) = config.expiration_offset {
417            self.compute_state.apply_expiration_offset(offset);
418        }
419
420        self.initialize_logging(config.logging);
421
422        self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
423    }
424
425    fn handle_update_configuration(&mut self, params: ComputeParameters) {
426        debug!("Applying configuration update: {params:?}");
427
428        let ComputeParameters {
429            workload_class,
430            max_result_size,
431            tracing,
432            grpc_client: _grpc_client,
433            dyncfg_updates,
434        } = params;
435
436        if let Some(v) = workload_class {
437            self.compute_state.metrics.set_workload_class(v);
438        }
439        if let Some(v) = max_result_size {
440            self.compute_state.max_result_size = v;
441        }
442
443        tracing.apply(self.compute_state.tracing_handle.as_ref());
444
445        dyncfg_updates.apply(&self.compute_state.worker_config);
446        self.compute_state
447            .persist_clients
448            .cfg()
449            .apply_from(&dyncfg_updates);
450
451        // Note: We're only updating mz_metrics from the compute state here, but not from the
452        // equivalent storage state. This is because they're running on the same process and
453        // share the metrics.
454        mz_metrics::update_dyncfg(&dyncfg_updates);
455
456        self.compute_state.apply_worker_config();
457    }
458
459    fn handle_create_dataflow(
460        &mut self,
461        dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
462    ) {
463        let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
464        let as_of = dataflow.as_of.clone().unwrap();
465
466        let dataflow_expiration = dataflow
467            .time_dependence
468            .as_ref()
469            .map(|time_dependence| {
470                self.determine_dataflow_expiration(time_dependence, &dataflow.until)
471            })
472            .unwrap_or_default();
473
474        // Add the dataflow expiration to `until`.
475        let until = dataflow.until.meet(&dataflow_expiration);
476
477        if dataflow.is_transient() {
478            debug!(
479                name = %dataflow.debug_name,
480                import_ids = %dataflow.display_import_ids(),
481                export_ids = %dataflow.display_export_ids(),
482                as_of = ?as_of.elements(),
483                time_dependence = ?dataflow.time_dependence,
484                expiration = ?dataflow_expiration.elements(),
485                expiration_datetime = ?dataflow_expiration
486                    .as_option()
487                    .map(|t| mz_ore::now::to_datetime(t.into())),
488                plan_until = ?dataflow.until.elements(),
489                until = ?until.elements(),
490                "creating dataflow",
491            );
492        } else {
493            info!(
494                name = %dataflow.debug_name,
495                import_ids = %dataflow.display_import_ids(),
496                export_ids = %dataflow.display_export_ids(),
497                as_of = ?as_of.elements(),
498                time_dependence = ?dataflow.time_dependence,
499                expiration = ?dataflow_expiration.elements(),
500                expiration_datetime = ?dataflow_expiration
501                    .as_option()
502                    .map(|t| mz_ore::now::to_datetime(t.into())),
503                plan_until = ?dataflow.until.elements(),
504                until = ?until.elements(),
505                "creating dataflow",
506            );
507        };
508
509        let subscribe_copy_ids: BTreeSet<_> = dataflow
510            .subscribe_ids()
511            .chain(dataflow.copy_to_ids())
512            .collect();
513
514        // Initialize compute and logging state for each object.
515        for object_id in dataflow.export_ids() {
516            let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
517            let metrics = self.compute_state.metrics.for_collection(object_id);
518            let mut collection = CollectionState::new(
519                Rc::clone(&dataflow_index),
520                is_subscribe_or_copy,
521                as_of.clone(),
522                metrics,
523            );
524
525            if let Some(logger) = self.compute_state.compute_logger.clone() {
526                let logging = CollectionLogging::new(
527                    object_id,
528                    logger,
529                    *dataflow_index,
530                    dataflow.import_ids(),
531                );
532                collection.logging = Some(logging);
533            }
534
535            collection.reset_reported_frontiers(ReportedFrontier::NotReported {
536                lower: as_of.clone(),
537            });
538
539            let existing = self.compute_state.collections.insert(object_id, collection);
540            if existing.is_some() {
541                error!(
542                    id = ?object_id,
543                    "existing collection for newly created dataflow",
544                );
545            }
546        }
547
548        let (start_signal, suspension_token) = StartSignal::new();
549        for id in dataflow.export_ids() {
550            self.compute_state
551                .suspended_collections
552                .insert(id, Rc::clone(&suspension_token));
553        }
554
555        crate::render::build_compute_dataflow(
556            self.timely_worker,
557            self.compute_state,
558            dataflow,
559            start_signal,
560            until,
561            dataflow_expiration,
562        );
563    }
564
565    fn handle_schedule(&mut self, id: GlobalId) {
566        // A `Schedule` command instructs us to begin dataflow computation for a collection, so
567        // we should unsuspend it by dropping the corresponding suspension token. Note that a
568        // dataflow can export multiple collections and they all share one suspension token, so the
569        // computation of a dataflow will only start once all its exported collections have been
570        // scheduled.
571        let suspension_token = self.compute_state.suspended_collections.remove(&id);
572        drop(suspension_token);
573    }
574
575    fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
576        if frontier.is_empty() {
577            // Indicates that we may drop `id`, as there are no more valid times to read.
578            self.drop_collection(id);
579        } else {
580            self.compute_state
581                .traces
582                .allow_compaction(id, frontier.borrow());
583        }
584    }
585
586    #[mz_ore::instrument(level = "debug")]
587    fn handle_peek(&mut self, peek: Peek) {
588        let pending = match &peek.target {
589            PeekTarget::Index { id } => {
590                // Acquire a copy of the trace suitable for fulfilling the peek.
591                let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
592                PendingPeek::index(peek, trace_bundle)
593            }
594            PeekTarget::Persist { metadata, .. } => {
595                let metadata = metadata.clone();
596                PendingPeek::persist(
597                    peek,
598                    Arc::clone(&self.compute_state.persist_clients),
599                    metadata,
600                    usize::cast_from(self.compute_state.max_result_size),
601                    self.timely_worker,
602                )
603            }
604        };
605
606        // Log the receipt of the peek.
607        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
608            logger.log(&pending.as_log_event(true));
609        }
610
611        self.process_peek(&mut Antichain::new(), pending);
612    }
613
614    fn handle_cancel_peek(&mut self, uuid: Uuid) {
615        if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
616            self.send_peek_response(peek, PeekResponse::Canceled);
617        }
618    }
619
620    fn handle_allow_writes(&mut self, id: GlobalId) {
621        // Enable persist compaction on any allow-writes command. We
622        // assume persist only compacts after making durable changes,
623        // such as appending a batch or advancing the upper.
624        self.compute_state.persist_clients.cfg().enable_compaction();
625
626        if let Some(collection) = self.compute_state.collections.get_mut(&id) {
627            collection.allow_writes();
628        } else {
629            soft_panic_or_log!("allow writes for unknown collection {id}");
630        }
631    }
632
633    /// Drop the given collection.
634    fn drop_collection(&mut self, id: GlobalId) {
635        let collection = self
636            .compute_state
637            .collections
638            .remove(&id)
639            .expect("dropped untracked collection");
640
641        // If this collection is an index, remove its trace.
642        self.compute_state.traces.remove(&id);
643        // If the collection is unscheduled, remove it from the list of waiting collections.
644        self.compute_state.suspended_collections.remove(&id);
645
646        // Drop the dataflow, if all its exports have been dropped.
647        if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
648            self.timely_worker.drop_dataflow(index);
649        }
650
651        // The compute protocol requires us to send a `Frontiers` response with empty frontiers
652        // when a collection was dropped, unless:
653        //  * The frontier was already reported as empty previously, or
654        //  * The collection is a subscribe or copy-to.
655        if !collection.is_subscribe_or_copy {
656            let reported = collection.reported_frontiers;
657            let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
658            let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
659            let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
660
661            let frontiers = FrontiersResponse {
662                write_frontier,
663                input_frontier,
664                output_frontier,
665            };
666            if frontiers.has_updates() {
667                self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
668            }
669        }
670    }
671
672    /// Initializes timely dataflow logging and publishes as a view.
673    pub fn initialize_logging(&mut self, config: LoggingConfig) {
674        if self.compute_state.compute_logger.is_some() {
675            panic!("dataflow server has already initialized logging");
676        }
677
678        let LoggingTraces {
679            traces,
680            dataflow_index,
681            compute_logger: logger,
682        } = logging::initialize(
683            self.timely_worker,
684            &config,
685            self.compute_state.metrics_registry.clone(),
686            Rc::clone(&self.compute_state.worker_config),
687            self.compute_state.workers_per_process,
688        );
689
690        let dataflow_index = Rc::new(dataflow_index);
691        let mut log_index_ids = config.index_logs;
692        for (log, trace) in traces {
693            // Install trace as maintained index.
694            let id = log_index_ids
695                .remove(&log)
696                .expect("`logging::initialize` does not invent logs");
697            self.compute_state.traces.set(id, trace);
698
699            // Initialize compute and logging state for the logging index.
700            let is_subscribe_or_copy = false;
701            let as_of = Antichain::from_elem(Timestamp::MIN);
702            let metrics = self.compute_state.metrics.for_collection(id);
703            let mut collection = CollectionState::new(
704                Rc::clone(&dataflow_index),
705                is_subscribe_or_copy,
706                as_of,
707                metrics,
708            );
709
710            let logging =
711                CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
712            collection.logging = Some(logging);
713
714            let existing = self.compute_state.collections.insert(id, collection);
715            if existing.is_some() {
716                error!(
717                    id = ?id,
718                    "existing collection for newly initialized logging export",
719                );
720            }
721        }
722
723        // Sanity check.
724        assert!(
725            log_index_ids.is_empty(),
726            "failed to create requested logging indexes: {log_index_ids:?}",
727        );
728
729        self.compute_state.compute_logger = Some(logger);
730    }
731
732    /// Send progress information to the controller.
733    pub fn report_frontiers(&mut self) {
734        let mut responses = Vec::new();
735
736        // Maintain a single allocation for `new_frontier` to avoid allocating on every iteration.
737        let mut new_frontier = Antichain::new();
738
739        for (&id, collection) in self.compute_state.collections.iter_mut() {
740            // The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
741            // collections (database-issues#4701).
742            if collection.is_subscribe_or_copy {
743                continue;
744            }
745
746            let reported = collection.reported_frontiers();
747
748            // Collect the write frontier and check for progress.
749            new_frontier.clear();
750            if let Some(traces) = self.compute_state.traces.get_mut(&id) {
751                assert!(
752                    collection.sink_write_frontier.is_none(),
753                    "collection {id} has multiple frontiers"
754                );
755                traces.oks_mut().read_upper(&mut new_frontier);
756            } else if let Some(frontier) = &collection.sink_write_frontier {
757                new_frontier.clone_from(&frontier.borrow());
758            } else {
759                error!(id = ?id, "collection without write frontier");
760                continue;
761            }
762            let new_write_frontier = reported
763                .write_frontier
764                .allows_reporting(&new_frontier)
765                .then(|| new_frontier.clone());
766
767            // Collect the output frontier and check for progress.
768            //
769            // By default, the output frontier equals the write frontier (which is still stored in
770            // `new_frontier`). If the collection provides a compute frontier, we construct the
771            // output frontier by taking the meet of write and compute frontier, to avoid:
772            //  * reporting progress through times we have not yet written
773            //  * reporting progress through times we have not yet fully processed, for
774            //    collections that jump their write frontiers into the future
775            //
776            // As a special case, in read-only mode we don't take the write frontier into account.
777            // The dataflow doesn't have the ability to push it forward, so it can't be used as a
778            // measure of dataflow progress.
779            if let Some(probe) = &collection.compute_probe {
780                if *collection.read_only_rx.borrow() {
781                    new_frontier.clear();
782                }
783                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
784            }
785            let new_output_frontier = reported
786                .output_frontier
787                .allows_reporting(&new_frontier)
788                .then(|| new_frontier.clone());
789
790            // Collect the input frontier and check for progress.
791            new_frontier.clear();
792            for probe in collection.input_probes.values() {
793                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
794            }
795            let new_input_frontier = reported
796                .input_frontier
797                .allows_reporting(&new_frontier)
798                .then(|| new_frontier.clone());
799
800            if let Some(frontier) = &new_write_frontier {
801                collection
802                    .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
803            }
804            if let Some(frontier) = &new_input_frontier {
805                collection
806                    .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
807            }
808            if let Some(frontier) = &new_output_frontier {
809                collection
810                    .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
811            }
812
813            let response = FrontiersResponse {
814                write_frontier: new_write_frontier,
815                input_frontier: new_input_frontier,
816                output_frontier: new_output_frontier,
817            };
818            if response.has_updates() {
819                responses.push((id, response));
820            }
821        }
822
823        for (id, frontiers) in responses {
824            self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
825        }
826    }
827
828    /// Report per-worker metrics.
829    pub(crate) fn report_metrics(&self) {
830        if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
831            let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
832            let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
833            let remaining = expiration - now;
834            self.compute_state
835                .metrics
836                .replica_expiration_remaining_seconds
837                .set(remaining)
838        }
839    }
840
841    /// Either complete the peek (and send the response) or put it in the pending set.
842    fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
843        let response = match &mut peek {
844            PendingPeek::Index(peek) => {
845                let start = Instant::now();
846
847                let peek_stash_eligible = peek
848                    .peek
849                    .finishing
850                    .is_streamable(peek.peek.result_desc.arity());
851
852                let peek_stash_enabled = {
853                    let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
854                    let peek_persist_stash_available =
855                        self.compute_state.peek_stash_persist_location.is_some();
856                    if !peek_persist_stash_available && enabled {
857                        error!("missing peek_stash_persist_location but peek stash is enabled");
858                    }
859                    enabled && peek_persist_stash_available
860                };
861
862                let peek_stash_threshold_bytes =
863                    PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
864
865                let metrics = IndexPeekMetrics {
866                    seek_fulfillment_seconds: &self
867                        .compute_state
868                        .metrics
869                        .index_peek_seek_fulfillment_seconds,
870                    frontier_check_seconds: &self
871                        .compute_state
872                        .metrics
873                        .index_peek_frontier_check_seconds,
874                    error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
875                    cursor_setup_seconds: &self
876                        .compute_state
877                        .metrics
878                        .index_peek_cursor_setup_seconds,
879                    row_iteration_seconds: &self
880                        .compute_state
881                        .metrics
882                        .index_peek_row_iteration_seconds,
883                    result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
884                    row_collection_seconds: &self
885                        .compute_state
886                        .metrics
887                        .index_peek_row_collection_seconds,
888                };
889
890                let status = peek.seek_fulfillment(
891                    upper,
892                    self.compute_state.max_result_size,
893                    peek_stash_enabled && peek_stash_eligible,
894                    peek_stash_threshold_bytes,
895                    &metrics,
896                );
897
898                self.compute_state
899                    .metrics
900                    .index_peek_total_seconds
901                    .observe(start.elapsed().as_secs_f64());
902
903                match status {
904                    PeekStatus::Ready(result) => Some(result),
905                    PeekStatus::NotReady => None,
906                    PeekStatus::UsePeekStash => {
907                        let _span =
908                            span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
909
910                        let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
911                            .get(&self.compute_state.worker_config);
912
913                        let stash_task = peek_stash::StashingPeek::start_upload(
914                            Arc::clone(&self.compute_state.persist_clients),
915                            self.compute_state
916                                .peek_stash_persist_location
917                                .as_ref()
918                                .expect("verified above"),
919                            peek.peek.clone(),
920                            peek.trace_bundle.clone(),
921                            peek_stash_batch_max_runs,
922                        );
923
924                        self.compute_state
925                            .pending_peeks
926                            .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
927                        return;
928                    }
929                }
930            }
931            PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
932                self.compute_state
933                    .metrics
934                    .persist_peek_seconds
935                    .observe(duration.as_secs_f64());
936                result
937            }),
938            PendingPeek::Stash(stashing_peek) => {
939                let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
940                let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
941                stashing_peek.pump_rows(num_batches, batch_size);
942
943                if let Ok((response, duration)) = stashing_peek.result.try_recv() {
944                    self.compute_state
945                        .metrics
946                        .stashed_peek_seconds
947                        .observe(duration.as_secs_f64());
948                    trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
949
950                    Some(response)
951                } else {
952                    None
953                }
954            }
955        };
956
957        if let Some(response) = response {
958            let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
959            self.send_peek_response(peek, response)
960        } else {
961            let uuid = peek.peek().uuid;
962            self.compute_state.pending_peeks.insert(uuid, peek);
963        }
964    }
965
966    /// Scan pending peeks and attempt to retire each.
967    pub fn process_peeks(&mut self) {
968        let mut upper = Antichain::new();
969        let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
970        for (_uuid, peek) in pending_peeks {
971            self.process_peek(&mut upper, peek);
972        }
973    }
974
975    /// Sends a response for this peek's resolution to the coordinator.
976    ///
977    /// Note that this function takes ownership of the `PendingPeek`, which is
978    /// meant to prevent multiple responses to the same peek.
979    #[mz_ore::instrument(level = "debug")]
980    fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
981        let log_event = peek.as_log_event(false);
982        // Respond with the response.
983        self.send_compute_response(ComputeResponse::PeekResponse(
984            peek.peek().uuid,
985            response,
986            OpenTelemetryContext::obtain(),
987        ));
988
989        // Log responding to the peek request.
990        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
991            logger.log(&log_event);
992        }
993    }
994
995    /// Scan the shared subscribe response buffer, and forward results along.
996    pub fn process_subscribes(&mut self) {
997        let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
998        for (sink_id, mut response) in subscribe_responses.drain(..) {
999            // Update frontier logging for this subscribe.
1000            if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1001                let new_frontier = match &response {
1002                    SubscribeResponse::Batch(b) => b.upper.clone(),
1003                    SubscribeResponse::DroppedAt(_) => Antichain::new(),
1004                };
1005
1006                let reported = collection.reported_frontiers();
1007                assert!(
1008                    reported.write_frontier.allows_reporting(&new_frontier),
1009                    "subscribe write frontier regression: {:?} -> {:?}",
1010                    reported.write_frontier,
1011                    new_frontier,
1012                );
1013                assert!(
1014                    reported.input_frontier.allows_reporting(&new_frontier),
1015                    "subscribe input frontier regression: {:?} -> {:?}",
1016                    reported.input_frontier,
1017                    new_frontier,
1018                );
1019
1020                collection
1021                    .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1022                collection
1023                    .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1024                collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1025            } else {
1026                // Presumably tracking state for this subscribe was already dropped by
1027                // `drop_collection`. There is nothing left to do for logging.
1028            }
1029
1030            response
1031                .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1032            self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1033        }
1034    }
1035
1036    /// Scan the shared copy to response buffer, and forward results along.
1037    pub fn process_copy_tos(&self) {
1038        let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1039        for (sink_id, response) in responses.drain(..) {
1040            self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1041        }
1042    }
1043
1044    /// Send a response to the coordinator.
1045    fn send_compute_response(&self, response: ComputeResponse) {
1046        // Ignore send errors because the coordinator is free to ignore our
1047        // responses. This happens during shutdown.
1048        let _ = self.response_tx.send(response);
1049    }
1050
1051    /// Checks for dataflow expiration. Panics if we're past the replica expiration time.
1052    pub(crate) fn check_expiration(&self) {
1053        let now = mz_ore::now::SYSTEM_TIME();
1054        if self.compute_state.replica_expiration.less_than(&now.into()) {
1055            let now_datetime = mz_ore::now::to_datetime(now);
1056            let expiration_datetime = self
1057                .compute_state
1058                .replica_expiration
1059                .as_option()
1060                .map(Into::into)
1061                .map(mz_ore::now::to_datetime);
1062
1063            // We error and assert separately to produce structured logs in anything that depends
1064            // on tracing.
1065            error!(
1066                now,
1067                now_datetime = ?now_datetime,
1068                expiration = ?self.compute_state.replica_expiration.elements(),
1069                expiration_datetime = ?expiration_datetime,
1070                "replica expired"
1071            );
1072
1073            // Repeat condition for better error message.
1074            assert!(
1075                !self.compute_state.replica_expiration.less_than(&now.into()),
1076                "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1077                self.compute_state.replica_expiration.elements(),
1078            );
1079        }
1080    }
1081
1082    /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be
1083    /// dropped.
1084    ///
1085    /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under
1086    /// which dataflow expiration should be disabled.
1087    pub fn determine_dataflow_expiration(
1088        &self,
1089        time_dependence: &TimeDependence,
1090        until: &Antichain<Timestamp>,
1091    ) -> Antichain<Timestamp> {
1092        // Evaluate time dependence with respect to the expiration time.
1093        // * Step time forward to ensure the expiration time is different to the moment a dataflow
1094        //   can legitimately jump to.
1095        // * We cannot expire dataflow with an until that is less or equal to the expiration time.
1096        let iter = self
1097            .compute_state
1098            .replica_expiration
1099            .iter()
1100            .filter_map(|t| time_dependence.apply(*t))
1101            .filter_map(|t| Timestamp::try_step_forward(&t))
1102            .filter(|expiration| !until.less_equal(expiration));
1103        Antichain::from_iter(iter)
1104    }
1105}
1106
1107/// A peek against either an index or a Persist collection.
1108///
1109/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1110/// as each `PendingPeek` is meant to be dropped after it's responded to.
1111pub enum PendingPeek {
1112    /// A peek against an index. (Possibly a temporary index created for the purpose.)
1113    Index(IndexPeek),
1114    /// A peek against a Persist-backed collection.
1115    Persist(PersistPeek),
1116    /// A peek against an index that is being stashed in the peek stash by an
1117    /// async background task.
1118    Stash(peek_stash::StashingPeek),
1119}
1120
1121impl PendingPeek {
1122    /// Produces a corresponding log event.
1123    pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1124        let peek = self.peek();
1125        let (id, peek_type) = match &peek.target {
1126            PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1127            PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1128        };
1129        let uuid = peek.uuid.into_bytes();
1130        ComputeEvent::Peek(PeekEvent {
1131            id,
1132            time: peek.timestamp,
1133            uuid,
1134            peek_type,
1135            installed,
1136        })
1137    }
1138
1139    fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1140        let empty_frontier = Antichain::new();
1141        let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1142        trace_bundle
1143            .oks_mut()
1144            .set_logical_compaction(timestamp_frontier.borrow());
1145        trace_bundle
1146            .errs_mut()
1147            .set_logical_compaction(timestamp_frontier.borrow());
1148        trace_bundle
1149            .oks_mut()
1150            .set_physical_compaction(empty_frontier.borrow());
1151        trace_bundle
1152            .errs_mut()
1153            .set_physical_compaction(empty_frontier.borrow());
1154
1155        PendingPeek::Index(IndexPeek {
1156            peek,
1157            trace_bundle,
1158            span: tracing::Span::current(),
1159        })
1160    }
1161
1162    fn persist(
1163        peek: Peek,
1164        persist_clients: Arc<PersistClientCache>,
1165        metadata: CollectionMetadata,
1166        max_result_size: usize,
1167        timely_worker: &TimelyWorker,
1168    ) -> Self {
1169        let active_worker = {
1170            // Choose the worker that does the actual peek arbitrarily but consistently.
1171            let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1172            chosen_index == timely_worker.index()
1173        };
1174        let activator = timely_worker.sync_activator_for([].into());
1175        let peek_uuid = peek.uuid;
1176
1177        let (result_tx, result_rx) = oneshot::channel();
1178        let timestamp = peek.timestamp;
1179        let mfp_plan = peek.map_filter_project.clone();
1180        let max_results_needed = peek
1181            .finishing
1182            .limit
1183            .map(|l| usize::cast_from(u64::from(l)))
1184            .unwrap_or(usize::MAX)
1185            + peek.finishing.offset;
1186        let order_by = peek.finishing.order_by.clone();
1187
1188        // Persist peeks can include at most one literal constraint.
1189        let literal_constraint = peek
1190            .literal_constraints
1191            .clone()
1192            .map(|rows| rows.into_element());
1193
1194        let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1195            let start = Instant::now();
1196            let result = if active_worker {
1197                PersistPeek::do_peek(
1198                    &persist_clients,
1199                    metadata,
1200                    timestamp,
1201                    literal_constraint,
1202                    mfp_plan,
1203                    max_result_size,
1204                    max_results_needed,
1205                )
1206                .await
1207            } else {
1208                Ok(vec![])
1209            };
1210            let result = match result {
1211                Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1212                Err(e) => PeekResponse::Error(e.to_string()),
1213            };
1214            match result_tx.send((result, start.elapsed())) {
1215                Ok(()) => {}
1216                Err((_result, elapsed)) => {
1217                    debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1218                }
1219            }
1220            match activator.activate() {
1221                Ok(()) => {}
1222                Err(_) => {
1223                    debug!("unable to wake timely after completed peek {peek_uuid}");
1224                }
1225            }
1226        });
1227        PendingPeek::Persist(PersistPeek {
1228            peek,
1229            _abort_handle: task_handle.abort_on_drop(),
1230            result: result_rx,
1231            span: tracing::Span::current(),
1232        })
1233    }
1234
1235    fn span(&self) -> &tracing::Span {
1236        match self {
1237            PendingPeek::Index(p) => &p.span,
1238            PendingPeek::Persist(p) => &p.span,
1239            PendingPeek::Stash(p) => &p.span,
1240        }
1241    }
1242
1243    pub(crate) fn peek(&self) -> &Peek {
1244        match self {
1245            PendingPeek::Index(p) => &p.peek,
1246            PendingPeek::Persist(p) => &p.peek,
1247            PendingPeek::Stash(p) => &p.peek,
1248        }
1249    }
1250}
1251
1252/// An in-progress Persist peek.
1253///
1254/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1255/// as each `PendingPeek` is meant to be dropped after it's responded to.
1256pub struct PersistPeek {
1257    pub(crate) peek: Peek,
1258    /// A background task that's responsible for producing the peek results.
1259    /// If we're no longer interested in the results, we abort the task.
1260    _abort_handle: AbortOnDropHandle<()>,
1261    /// The result of the background task, eventually.
1262    result: oneshot::Receiver<(PeekResponse, Duration)>,
1263    /// The `tracing::Span` tracking this peek's operation
1264    span: tracing::Span,
1265}
1266
1267impl PersistPeek {
1268    async fn do_peek(
1269        persist_clients: &PersistClientCache,
1270        metadata: CollectionMetadata,
1271        as_of: Timestamp,
1272        literal_constraint: Option<Row>,
1273        mfp_plan: SafeMfpPlan,
1274        max_result_size: usize,
1275        mut limit_remaining: usize,
1276    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1277        let client = persist_clients
1278            .open(metadata.persist_location)
1279            .await
1280            .map_err(|e| e.to_string())?;
1281
1282        let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1283            .open_leased_reader(
1284                metadata.data_shard,
1285                Arc::new(metadata.relation_desc.clone()),
1286                Arc::new(UnitSchema),
1287                Diagnostics::from_purpose("persist::peek"),
1288                USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1289            )
1290            .await
1291            .map_err(|e| e.to_string())?;
1292
1293        // If we are using txn-wal for this collection, then the upper might
1294        // be advanced lazily and we have to go through txn-wal for reads.
1295        //
1296        // TODO: If/when we have a process-wide TxnsRead worker for clusterd,
1297        // use in here (instead of opening a new TxnsCache) to save a persist
1298        // reader registration and some txns shard read traffic.
1299        let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1300            Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1301        } else {
1302            None
1303        };
1304
1305        let metrics = client.metrics();
1306
1307        let mut cursor = StatsCursor::new(
1308            &mut reader,
1309            txns_read.as_mut(),
1310            metrics,
1311            &mfp_plan,
1312            &metadata.relation_desc,
1313            Antichain::from_elem(as_of),
1314        )
1315        .await
1316        .map_err(|since| {
1317            format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1318        })?;
1319
1320        // Re-used state for processing and building rows.
1321        let mut result = vec![];
1322        let mut datum_vec = DatumVec::new();
1323        let mut row_builder = Row::default();
1324        let arena = RowArena::new();
1325        let mut total_size = 0usize;
1326
1327        let literal_len = match &literal_constraint {
1328            None => 0,
1329            Some(row) => row.iter().count(),
1330        };
1331
1332        'collect: while limit_remaining > 0 {
1333            let Some(batch) = cursor.next().await else {
1334                break;
1335            };
1336            for (data, _, d) in batch {
1337                let row = data.map_err(|e| e.to_string())?;
1338
1339                if let Some(literal) = &literal_constraint {
1340                    match row.iter().take(literal_len).cmp(literal.iter()) {
1341                        Ordering::Less => continue,
1342                        Ordering::Equal => {}
1343                        Ordering::Greater => break 'collect,
1344                    }
1345                }
1346
1347                let count: usize = d.try_into().map_err(|_| {
1348                    error!(
1349                        shard = %metadata.data_shard, diff = d, ?row,
1350                        "persist peek encountered negative multiplicities",
1351                    );
1352                    format!(
1353                        "Invalid data in source, \
1354                         saw retractions ({}) for row that does not exist: {:?}",
1355                        -d, row,
1356                    )
1357                })?;
1358                let Some(count) = NonZeroUsize::new(count) else {
1359                    continue;
1360                };
1361                let mut datum_local = datum_vec.borrow_with(&row);
1362                let eval_result = mfp_plan
1363                    .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1364                    .map(|row| row.cloned())
1365                    .map_err(|e| e.to_string())?;
1366                if let Some(row) = eval_result {
1367                    total_size = total_size
1368                        .saturating_add(row.byte_len())
1369                        .saturating_add(std::mem::size_of::<NonZeroUsize>());
1370                    if total_size > max_result_size {
1371                        return Err(format!(
1372                            "result exceeds max size of {}",
1373                            ByteSize::b(u64::cast_from(max_result_size))
1374                        ));
1375                    }
1376                    result.push((row, count));
1377                    limit_remaining = limit_remaining.saturating_sub(count.get());
1378                    if limit_remaining == 0 {
1379                        break;
1380                    }
1381                }
1382            }
1383        }
1384
1385        Ok(result)
1386    }
1387}
1388
1389/// An in-progress index-backed peek, and data to eventually fulfill it.
1390pub struct IndexPeek {
1391    peek: Peek,
1392    /// The data from which the trace derives.
1393    trace_bundle: TraceBundle,
1394    /// The `tracing::Span` tracking this peek's operation
1395    span: tracing::Span,
1396}
1397
1398/// Histogram metrics for index peek phases.
1399///
1400/// This struct bundles references to the various histogram metrics used to
1401/// instrument the index peek processing pipeline.
1402pub(crate) struct IndexPeekMetrics<'a> {
1403    pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1404    pub frontier_check_seconds: &'a prometheus::Histogram,
1405    pub error_scan_seconds: &'a prometheus::Histogram,
1406    pub cursor_setup_seconds: &'a prometheus::Histogram,
1407    pub row_iteration_seconds: &'a prometheus::Histogram,
1408    pub result_sort_seconds: &'a prometheus::Histogram,
1409    pub row_collection_seconds: &'a prometheus::Histogram,
1410}
1411
1412impl IndexPeek {
1413    /// Attempts to fulfill the peek and reports success.
1414    ///
1415    /// To produce output at `peek.timestamp`, we must be certain that
1416    /// it is no longer changing. A trace guarantees that all future
1417    /// changes will be greater than or equal to an element of `upper`.
1418    ///
1419    /// If an element of `upper` is less or equal to `peek.timestamp`,
1420    /// then there can be further updates that would change the output.
1421    /// If no element of `upper` is less or equal to `peek.timestamp`,
1422    /// then for any time `t` less or equal to `peek.timestamp` it is
1423    /// not the case that `upper` is less or equal to that timestamp,
1424    /// and so the result cannot further evolve.
1425    fn seek_fulfillment(
1426        &mut self,
1427        upper: &mut Antichain<Timestamp>,
1428        max_result_size: u64,
1429        peek_stash_eligible: bool,
1430        peek_stash_threshold_bytes: usize,
1431        metrics: &IndexPeekMetrics<'_>,
1432    ) -> PeekStatus {
1433        let method_start = Instant::now();
1434
1435        self.trace_bundle.oks_mut().read_upper(upper);
1436        if upper.less_equal(&self.peek.timestamp) {
1437            return PeekStatus::NotReady;
1438        }
1439        self.trace_bundle.errs_mut().read_upper(upper);
1440        if upper.less_equal(&self.peek.timestamp) {
1441            return PeekStatus::NotReady;
1442        }
1443
1444        let read_frontier = self.trace_bundle.compaction_frontier();
1445        if !read_frontier.less_equal(&self.peek.timestamp) {
1446            let error = format!(
1447                "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1448                read_frontier.elements(),
1449                self.peek.timestamp,
1450            );
1451            return PeekStatus::Ready(PeekResponse::Error(error));
1452        }
1453
1454        metrics
1455            .frontier_check_seconds
1456            .observe(method_start.elapsed().as_secs_f64());
1457
1458        let result = self.collect_finished_data(
1459            max_result_size,
1460            peek_stash_eligible,
1461            peek_stash_threshold_bytes,
1462            metrics,
1463        );
1464
1465        metrics
1466            .seek_fulfillment_seconds
1467            .observe(method_start.elapsed().as_secs_f64());
1468
1469        result
1470    }
1471
1472    /// Collects data for a known-complete peek from the ok stream.
1473    fn collect_finished_data(
1474        &mut self,
1475        max_result_size: u64,
1476        peek_stash_eligible: bool,
1477        peek_stash_threshold_bytes: usize,
1478        metrics: &IndexPeekMetrics<'_>,
1479    ) -> PeekStatus {
1480        let error_scan_start = Instant::now();
1481
1482        // Check if there exist any errors and, if so, return whatever one we
1483        // find first.
1484        let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1485        while cursor.key_valid(&storage) {
1486            let mut copies = Diff::ZERO;
1487            cursor.map_times(&storage, |time, diff| {
1488                if time.less_equal(&self.peek.timestamp) {
1489                    copies += diff;
1490                }
1491            });
1492            if copies.is_negative() {
1493                let error = cursor.key(&storage);
1494                error!(
1495                    target = %self.peek.target.id(), diff = %copies, %error,
1496                    "index peek encountered negative multiplicities in error trace",
1497                );
1498                return PeekStatus::Ready(PeekResponse::Error(format!(
1499                    "Invalid data in source errors, \
1500                    saw retractions ({}) for row that does not exist: {}",
1501                    -copies, error,
1502                )));
1503            }
1504            if copies.is_positive() {
1505                return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1506            }
1507            cursor.step_key(&storage);
1508        }
1509
1510        metrics
1511            .error_scan_seconds
1512            .observe(error_scan_start.elapsed().as_secs_f64());
1513
1514        Self::collect_ok_finished_data(
1515            &self.peek,
1516            self.trace_bundle.oks_mut(),
1517            max_result_size,
1518            peek_stash_eligible,
1519            peek_stash_threshold_bytes,
1520            metrics,
1521        )
1522    }
1523
1524    /// Collects data for a known-complete peek from the ok stream.
1525    fn collect_ok_finished_data<Tr>(
1526        peek: &Peek,
1527        oks_handle: &mut Tr,
1528        max_result_size: u64,
1529        peek_stash_eligible: bool,
1530        peek_stash_threshold_bytes: usize,
1531        metrics: &IndexPeekMetrics<'_>,
1532    ) -> PeekStatus
1533    where
1534        for<'a> Tr: TraceReader<
1535                Key<'a>: ToDatumIter + Eq,
1536                KeyContainer: BatchContainer<Owned = Row>,
1537                Val<'a>: ToDatumIter,
1538                TimeGat<'a>: PartialOrder<Timestamp>,
1539                DiffGat<'a> = &'a Diff,
1540            >,
1541    {
1542        let max_result_size = usize::cast_from(max_result_size);
1543        let count_byte_size = size_of::<NonZeroUsize>();
1544
1545        // Cursor setup timing
1546        let cursor_setup_start = Instant::now();
1547
1548        // We clone `literal_constraints` here because we don't want to move the constraints
1549        // out of the peek struct, and don't want to modify in-place.
1550        let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1551            peek.target.id().clone(),
1552            peek.map_filter_project.clone(),
1553            peek.timestamp,
1554            peek.literal_constraints.clone().as_deref_mut(),
1555            oks_handle,
1556        );
1557
1558        metrics
1559            .cursor_setup_seconds
1560            .observe(cursor_setup_start.elapsed().as_secs_f64());
1561
1562        // Accumulated `Vec<(row, count)>` results that we are likely to return.
1563        let mut results = Vec::new();
1564        let mut total_size: usize = 0;
1565
1566        // When set, a bound on the number of records we need to return.
1567        // The requirements on the records are driven by the finishing's
1568        // `order_by` field. Further limiting will happen when the results
1569        // are collected, so we don't need to have exactly this many results,
1570        // just at least those results that would have been returned.
1571        let max_results = peek.finishing.num_rows_needed();
1572
1573        let comparator = RowComparator::new(peek.finishing.order_by.as_slice());
1574
1575        // Row iteration timing
1576        let row_iteration_start = Instant::now();
1577        let mut sort_time_accum = Duration::ZERO;
1578
1579        while let Some(row) = peek_iterator.next() {
1580            let row: (Row, _) = match row {
1581                Ok(row) => row,
1582                Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1583            };
1584            let (row, copies) = row;
1585            let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1586
1587            total_size = total_size
1588                .saturating_add(row.byte_len())
1589                .saturating_add(count_byte_size);
1590            if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1591                return PeekStatus::UsePeekStash;
1592            }
1593            if total_size > max_result_size {
1594                return PeekStatus::Ready(PeekResponse::Error(format!(
1595                    "result exceeds max size of {}",
1596                    ByteSize::b(u64::cast_from(max_result_size))
1597                )));
1598            }
1599
1600            results.push((row, copies));
1601
1602            // If we hold many more than `max_results` records, we can thin down
1603            // `results` using `self.finishing.ordering`.
1604            if let Some(max_results) = max_results {
1605                // We use a threshold twice what we intend, to amortize the work
1606                // across all of the insertions. We could tighten this, but it
1607                // works for the moment.
1608                if results.len() >= 2 * max_results {
1609                    if peek.finishing.order_by.is_empty() {
1610                        results.truncate(max_results);
1611                        metrics
1612                            .row_iteration_seconds
1613                            .observe(row_iteration_start.elapsed().as_secs_f64());
1614                        metrics
1615                            .result_sort_seconds
1616                            .observe(sort_time_accum.as_secs_f64());
1617                        let row_collection_start = Instant::now();
1618                        let collection = RowCollection::new(results, &peek.finishing.order_by);
1619                        metrics
1620                            .row_collection_seconds
1621                            .observe(row_collection_start.elapsed().as_secs_f64());
1622                        return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1623                    } else {
1624                        // We can sort `results` and then truncate to `max_results`.
1625                        // This has an effect similar to a priority queue, without
1626                        // its interactive dequeueing properties.
1627                        // TODO: Had we left these as `Vec<Datum>` we would avoid
1628                        // the unpacking; we should consider doing that, although
1629                        // it will require a re-pivot of the code to branch on this
1630                        // inner test (as we prefer not to maintain `Vec<Datum>`
1631                        // in the other case).
1632                        let sort_start = Instant::now();
1633                        results.sort_by(|left, right| {
1634                            comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0))
1635                        });
1636                        sort_time_accum += sort_start.elapsed();
1637                        let dropped = results.drain(max_results..);
1638                        let dropped_size =
1639                            dropped
1640                                .into_iter()
1641                                .fold(0, |acc: usize, (row, _count): (Row, _)| {
1642                                    acc.saturating_add(
1643                                        row.byte_len().saturating_add(count_byte_size),
1644                                    )
1645                                });
1646                        total_size = total_size.saturating_sub(dropped_size);
1647                    }
1648                }
1649            }
1650        }
1651
1652        metrics
1653            .row_iteration_seconds
1654            .observe(row_iteration_start.elapsed().as_secs_f64());
1655        metrics
1656            .result_sort_seconds
1657            .observe(sort_time_accum.as_secs_f64());
1658
1659        let row_collection_start = Instant::now();
1660        let collection = RowCollection::new(results, &peek.finishing.order_by);
1661        metrics
1662            .row_collection_seconds
1663            .observe(row_collection_start.elapsed().as_secs_f64());
1664        PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1665    }
1666}
1667
1668/// For keeping track of the state of pending or ready peeks, and managing
1669/// control flow.
1670enum PeekStatus {
1671    /// The frontiers of objects are not yet advanced enough, peek is still
1672    /// pending.
1673    NotReady,
1674    /// The result size is above the configured threshold and the peek is
1675    /// eligible for using the peek result stash.
1676    UsePeekStash,
1677    /// The peek result is ready.
1678    Ready(PeekResponse),
1679}
1680
1681/// The frontiers we have reported to the controller for a collection.
1682#[derive(Debug)]
1683struct ReportedFrontiers {
1684    /// The reported write frontier.
1685    write_frontier: ReportedFrontier,
1686    /// The reported input frontier.
1687    input_frontier: ReportedFrontier,
1688    /// The reported output frontier.
1689    output_frontier: ReportedFrontier,
1690}
1691
1692impl ReportedFrontiers {
1693    /// Creates a new `ReportedFrontiers` instance.
1694    fn new() -> Self {
1695        Self {
1696            write_frontier: ReportedFrontier::new(),
1697            input_frontier: ReportedFrontier::new(),
1698            output_frontier: ReportedFrontier::new(),
1699        }
1700    }
1701}
1702
1703/// A frontier we have reported to the controller, or the least frontier we are allowed to report.
1704#[derive(Clone, Debug)]
1705pub enum ReportedFrontier {
1706    /// A frontier has been previously reported.
1707    Reported(Antichain<Timestamp>),
1708    /// No frontier has been reported yet.
1709    NotReported {
1710        /// A lower bound for frontiers that may be reported in the future.
1711        lower: Antichain<Timestamp>,
1712    },
1713}
1714
1715impl ReportedFrontier {
1716    /// Create a new `ReportedFrontier` enforcing the minimum lower bound.
1717    pub fn new() -> Self {
1718        let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1719        Self::NotReported { lower }
1720    }
1721
1722    /// Whether the reported frontier is the empty frontier.
1723    pub fn is_empty(&self) -> bool {
1724        match self {
1725            Self::Reported(frontier) => frontier.is_empty(),
1726            Self::NotReported { .. } => false,
1727        }
1728    }
1729
1730    /// Whether this `ReportedFrontier` allows reporting the given frontier.
1731    ///
1732    /// A `ReportedFrontier` allows reporting of another frontier if:
1733    ///  * The other frontier is greater than the reported frontier.
1734    ///  * The other frontier is greater than or equal to the lower bound.
1735    fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1736        match self {
1737            Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1738            Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1739        }
1740    }
1741}
1742
1743/// State maintained for a compute collection.
1744pub struct CollectionState {
1745    /// Tracks the frontiers that have been reported to the controller.
1746    reported_frontiers: ReportedFrontiers,
1747    /// The index of the dataflow computing this collection.
1748    ///
1749    /// Used for dropping the dataflow when the collection is dropped.
1750    /// The Dataflow index is wrapped in an `Rc`s and can be shared between collections, to reflect
1751    /// the possibility that a single dataflow can export multiple collections.
1752    dataflow_index: Rc<usize>,
1753    /// Whether this collection is a subscribe or copy-to.
1754    ///
1755    /// The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
1756    /// collections, so we need to be able to recognize them. This is something we would like to
1757    /// change in the future (database-issues#4701).
1758    pub is_subscribe_or_copy: bool,
1759    /// The collection's initial as-of frontier.
1760    ///
1761    /// Used to determine hydration status.
1762    as_of: Antichain<Timestamp>,
1763
1764    /// A token that should be dropped when this collection is dropped to clean up associated
1765    /// sink state.
1766    ///
1767    /// Only `Some` if the collection is a sink.
1768    pub sink_token: Option<SinkToken>,
1769    /// Frontier of sink writes.
1770    ///
1771    /// Only `Some` if the collection is a sink and *not* a subscribe.
1772    pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1773    /// Frontier probes for every input to the collection.
1774    pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1775    /// A probe reporting the frontier of times through which all collection outputs have been
1776    /// computed (but not necessarily written).
1777    ///
1778    /// `None` for collections with compute frontiers equal to their write frontiers.
1779    pub compute_probe: Option<probe::Handle<Timestamp>>,
1780    /// Logging state maintained for this collection.
1781    logging: Option<CollectionLogging>,
1782    /// Metrics tracked for this collection.
1783    metrics: CollectionMetrics,
1784    /// Send-side to transition a dataflow from read-only mode to read-write mode.
1785    ///
1786    /// All dataflows start in read-only mode. Only after receiving a
1787    /// `AllowWrites` command from the controller will they transition to
1788    /// read-write mode.
1789    ///
1790    /// A dataflow in read-only mode must not affect any external state.
1791    ///
1792    /// NOTE: In the future, we might want a more complicated flag, for example
1793    /// something that tells us after which timestamp we are allowed to write.
1794    /// In this first version we are keeping things as simple as possible!
1795    read_only_tx: watch::Sender<bool>,
1796    /// Receive-side to observe whether a dataflow is in read-only mode.
1797    pub read_only_rx: watch::Receiver<bool>,
1798}
1799
1800impl CollectionState {
1801    fn new(
1802        dataflow_index: Rc<usize>,
1803        is_subscribe_or_copy: bool,
1804        as_of: Antichain<Timestamp>,
1805        metrics: CollectionMetrics,
1806    ) -> Self {
1807        // We always initialize as read_only=true. Only when we're explicitly
1808        // allowed to we switch to read-write.
1809        let (read_only_tx, read_only_rx) = watch::channel(true);
1810
1811        Self {
1812            reported_frontiers: ReportedFrontiers::new(),
1813            dataflow_index,
1814            is_subscribe_or_copy,
1815            as_of,
1816            sink_token: None,
1817            sink_write_frontier: None,
1818            input_probes: Default::default(),
1819            compute_probe: None,
1820            logging: None,
1821            metrics,
1822            read_only_tx,
1823            read_only_rx,
1824        }
1825    }
1826
1827    /// Return the frontiers that have been reported to the controller.
1828    fn reported_frontiers(&self) -> &ReportedFrontiers {
1829        &self.reported_frontiers
1830    }
1831
1832    /// Reset all reported frontiers to the given value.
1833    pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1834        self.reported_frontiers.write_frontier = frontier.clone();
1835        self.reported_frontiers.input_frontier = frontier.clone();
1836        self.reported_frontiers.output_frontier = frontier;
1837    }
1838
1839    /// Set the write frontier that has been reported to the controller.
1840    fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1841        if let Some(logging) = &mut self.logging {
1842            let time = match &frontier {
1843                ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1844                ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1845            };
1846            logging.set_frontier(time);
1847        }
1848
1849        self.reported_frontiers.write_frontier = frontier;
1850    }
1851
1852    /// Set the input frontier that has been reported to the controller.
1853    fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1854        // Use this opportunity to update our input frontier logging.
1855        if let Some(logging) = &mut self.logging {
1856            for (id, probe) in &self.input_probes {
1857                let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1858                logging.set_import_frontier(*id, new_time);
1859            }
1860        }
1861
1862        self.reported_frontiers.input_frontier = frontier;
1863    }
1864
1865    /// Set the output frontier that has been reported to the controller.
1866    fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1867        let already_hydrated = self.hydrated();
1868
1869        self.reported_frontiers.output_frontier = frontier;
1870
1871        if !already_hydrated && self.hydrated() {
1872            if let Some(logging) = &mut self.logging {
1873                logging.set_hydrated();
1874            }
1875            self.metrics.record_collection_hydrated();
1876        }
1877    }
1878
1879    /// Return whether this collection is hydrated.
1880    fn hydrated(&self) -> bool {
1881        match &self.reported_frontiers.output_frontier {
1882            ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1883            ReportedFrontier::NotReported { .. } => false,
1884        }
1885    }
1886
1887    /// Allow writes for this collection.
1888    fn allow_writes(&self) {
1889        info!(
1890            dataflow_index = *self.dataflow_index,
1891            export = ?self.logging.as_ref().map(|l| l.export_id()),
1892            "allowing writes for dataflow",
1893        );
1894        let _ = self.read_only_tx.send(false);
1895    }
1896}