mz_compute/
compute_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for compute timely instances.
7
8use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use bytesize::ByteSize;
19use differential_dataflow::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32    ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33    PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
34};
35use mz_compute_types::plan::render_plan::RenderPlan;
36use mz_dyncfg::ConfigSet;
37use mz_expr::SafeMfpPlan;
38use mz_expr::row::RowCollection;
39use mz_ore::cast::CastFrom;
40use mz_ore::collections::CollectionExt;
41use mz_ore::metrics::UIntGauge;
42use mz_ore::now::EpochMillis;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82/// Worker-local state that is maintained across dataflows.
83///
84/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
85/// done between data ingress and egress.
86pub struct ComputeState {
87    /// State kept for each installed compute collection.
88    ///
89    /// Each collection has exactly one frontier.
90    /// How the frontier is communicated depends on the collection type:
91    ///  * Frontiers of indexes are equal to the frontier of their corresponding traces in the
92    ///    `TraceManager`.
93    ///  * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
94    ///  * Subscribes report their frontiers through the `subscribe_response_buffer`.
95    pub collections: BTreeMap<GlobalId, CollectionState>,
96    /// Collections that were recently dropped and whose removal needs to be reported.
97    pub dropped_collections: Vec<(GlobalId, DroppedCollection)>,
98    /// The traces available for sharing across dataflows.
99    pub traces: TraceManager,
100    /// Shared buffer with SUBSCRIBE operator instances by which they can respond.
101    ///
102    /// The entries are pairs of sink identifier (to identify the subscribe instance)
103    /// and the response itself.
104    pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
105    /// Shared buffer with S3 oneshot operator instances by which they can respond.
106    ///
107    /// The entries are pairs of sink identifier (to identify the s3 oneshot instance)
108    /// and the response itself.
109    pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
110    /// Peek commands that are awaiting fulfillment.
111    pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
112    /// The persist location where we can stash large peek results.
113    pub peek_stash_persist_location: Option<PersistLocation>,
114    /// The logger, from Timely's logging framework, if logs are enabled.
115    pub compute_logger: Option<logging::compute::Logger>,
116    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
117    /// This is intentionally shared between workers.
118    pub persist_clients: Arc<PersistClientCache>,
119    /// Context necessary for rendering txn-wal operators.
120    pub txns_ctx: TxnsContext,
121    /// History of commands received by this workers and all its peers.
122    pub command_history: ComputeCommandHistory<UIntGauge>,
123    /// Max size in bytes of any result.
124    max_result_size: u64,
125    /// Specification for rendering linear joins.
126    pub linear_join_spec: LinearJoinSpec,
127    /// Metrics for this worker.
128    pub metrics: WorkerMetrics,
129    /// A process-global handle to tracing configuration.
130    tracing_handle: Arc<TracingHandle>,
131    /// Other configuration for compute
132    pub context: ComputeInstanceContext,
133    /// Per-worker dynamic configuration.
134    ///
135    /// This is separate from the process-global `ConfigSet` and contains config options that need
136    /// to be applied consistently with compute command order.
137    ///
138    /// For example, for options that influence dataflow rendering it is important that all workers
139    /// render the same dataflow with the same options. If these options were stored in a global
140    /// `ConfigSet`, we couldn't guarantee that all workers observe changes to them at the same
141    /// point in the stream of compute commands. Storing per-worker configuration ensures that
142    /// because each worker's configuration is only updated once that worker observes the
143    /// respective `UpdateConfiguration` command.
144    ///
145    /// Reference-counted to avoid cloning for `Context`.
146    pub worker_config: Rc<ConfigSet>,
147
148    /// Collections awaiting schedule instruction by the controller.
149    ///
150    /// Each entry stores a reference to a token that can be dropped to unsuspend the collection's
151    /// dataflow. Multiple collections can reference the same token if they are exported by the
152    /// same dataflow.
153    suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
154
155    /// When this replica/cluster is in read-only mode it must not affect any
156    /// changes to external state. This flag can only be changed by a
157    /// [ComputeCommand::AllowWrites].
158    ///
159    /// Everything running on this replica/cluster must obey this flag. At the
160    /// time of writing the only part that is doing this is `persist_sink`.
161    ///
162    /// NOTE: In the future, we might want a more complicated flag, for example
163    /// something that tells us after which timestamp we are allowed to write.
164    /// In this first version we are keeping things as simple as possible!
165    pub read_only_rx: watch::Receiver<bool>,
166
167    /// Send-side for read-only state.
168    pub read_only_tx: watch::Sender<bool>,
169
170    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
171    /// perform maintenance with every `step_or_park` invocation.
172    pub server_maintenance_interval: Duration,
173
174    /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started.
175    ///
176    /// Used to compute `replica_expiration`.
177    pub init_system_time: EpochMillis,
178
179    /// The maximum time for which the replica is expected to live. If not empty, dataflows in the
180    /// replica can drop diffs associated with timestamps beyond the replica expiration.
181    /// The replica will panic if such dataflows are not dropped before the replica has expired.
182    pub replica_expiration: Antichain<Timestamp>,
183}
184
185impl ComputeState {
186    /// Construct a new `ComputeState`.
187    pub fn new(
188        persist_clients: Arc<PersistClientCache>,
189        txns_ctx: TxnsContext,
190        metrics: WorkerMetrics,
191        tracing_handle: Arc<TracingHandle>,
192        context: ComputeInstanceContext,
193    ) -> Self {
194        let traces = TraceManager::new(metrics.clone());
195        let command_history = ComputeCommandHistory::new(metrics.for_history());
196
197        // We always initialize as read_only=true. Only when we're explicitly
198        // allowed do we switch to doing writes.
199        let (read_only_tx, read_only_rx) = watch::channel(true);
200
201        Self {
202            collections: Default::default(),
203            dropped_collections: Default::default(),
204            traces,
205            subscribe_response_buffer: Default::default(),
206            copy_to_response_buffer: Default::default(),
207            pending_peeks: Default::default(),
208            peek_stash_persist_location: None,
209            compute_logger: None,
210            persist_clients,
211            txns_ctx,
212            command_history,
213            max_result_size: u64::MAX,
214            linear_join_spec: Default::default(),
215            metrics,
216            tracing_handle,
217            context,
218            worker_config: mz_dyncfgs::all_dyncfgs().into(),
219            suspended_collections: Default::default(),
220            read_only_tx,
221            read_only_rx,
222            server_maintenance_interval: Duration::ZERO,
223            init_system_time: mz_ore::now::SYSTEM_TIME(),
224            replica_expiration: Antichain::default(),
225        }
226    }
227
228    /// Return a mutable reference to the identified collection.
229    ///
230    /// Panics if the collection doesn't exist.
231    pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
232        self.collections
233            .get_mut(&id)
234            .expect("collection must exist")
235    }
236
237    /// Construct a new frontier probe for the given input and add it to the state of the given
238    /// collections.
239    ///
240    /// The caller is responsible for attaching the returned probe handle to the respective
241    /// dataflow input stream.
242    pub fn input_probe_for(
243        &mut self,
244        input_id: GlobalId,
245        collection_ids: impl Iterator<Item = GlobalId>,
246    ) -> probe::Handle<Timestamp> {
247        let probe = probe::Handle::default();
248        for id in collection_ids {
249            if let Some(collection) = self.collections.get_mut(&id) {
250                collection.input_probes.insert(input_id, probe.clone());
251            }
252        }
253        probe
254    }
255
256    /// Apply the current `worker_config` to the compute state.
257    fn apply_worker_config(&mut self) {
258        use mz_compute_types::dyncfgs::*;
259
260        let config = &self.worker_config;
261
262        self.linear_join_spec = LinearJoinSpec::from_config(config);
263
264        if ENABLE_LGALLOC.get(config) {
265            if let Some(path) = &self.context.scratch_directory {
266                let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
267                let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
268                let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
269                let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
270                let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
271                info!(
272                    ?path,
273                    backgrund_interval=?interval,
274                    clear_bytes,
275                    eager_return,
276                    file_growth_dampener,
277                    local_buffer_bytes,
278                    "enabling lgalloc"
279                );
280                let background_worker_config = lgalloc::BackgroundWorkerConfig {
281                    interval,
282                    clear_bytes,
283                };
284                lgalloc::lgalloc_set_config(
285                    lgalloc::LgAlloc::new()
286                        .enable()
287                        .with_path(path.clone())
288                        .with_background_config(background_worker_config)
289                        .eager_return(eager_return)
290                        .file_growth_dampener(file_growth_dampener)
291                        .local_buffer_bytes(local_buffer_bytes),
292                );
293            } else {
294                debug!("not enabling lgalloc, scratch directory not specified");
295            }
296        } else {
297            info!("disabling lgalloc");
298            lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
299        }
300
301        crate::memory_limiter::apply_limiter_config(config);
302
303        mz_ore::region::ENABLE_LGALLOC_REGION.store(
304            ENABLE_COLUMNATION_LGALLOC.get(config),
305            std::sync::atomic::Ordering::Relaxed,
306        );
307
308        let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
309        mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
310
311        // Remember the maintenance interval locally to avoid reading it from the config set on
312        // every server iteration.
313        self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
314
315        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
316        match overflowing_behavior.parse() {
317            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
318            Err(err) => {
319                error!(
320                    err,
321                    overflowing_behavior, "Invalid value for ore_overflowing_behavior"
322                );
323            }
324        }
325    }
326
327    /// Apply the provided replica expiration `offset` by converting it to a frontier relative to
328    /// the replica's initialization system time.
329    ///
330    /// Only expected to be called once when creating the instance. Guards against calling it
331    /// multiple times by checking if the local expiration time is set.
332    pub fn apply_expiration_offset(&mut self, offset: Duration) {
333        if self.replica_expiration.is_empty() {
334            let offset: EpochMillis = offset
335                .as_millis()
336                .try_into()
337                .expect("duration must fit within u64");
338            let replica_expiration_millis = self.init_system_time + offset;
339            let replica_expiration = Timestamp::from(replica_expiration_millis);
340
341            info!(
342                offset = %offset,
343                replica_expiration_millis = %replica_expiration_millis,
344                replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
345                "setting replica expiration",
346            );
347            self.replica_expiration = Antichain::from_elem(replica_expiration);
348
349            // Record the replica expiration in the metrics.
350            self.metrics
351                .replica_expiration_timestamp_seconds
352                .set(replica_expiration.into());
353        }
354    }
355
356    /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
357    /// appropriate to this replica.
358    pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
359        use mz_compute_types::dyncfgs::{
360            DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
361        };
362
363        if self.persist_clients.cfg.is_cc_active {
364            DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
365        } else {
366            DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
367        }
368    }
369}
370
371/// A wrapper around [ComputeState] with a live timely worker and response channel.
372pub(crate) struct ActiveComputeState<'a, A: Allocate> {
373    /// The underlying Timely worker.
374    pub timely_worker: &'a mut TimelyWorker<A>,
375    /// The compute state itself.
376    pub compute_state: &'a mut ComputeState,
377    /// The channel over which frontier information is reported.
378    pub response_tx: &'a mut ResponseSender,
379}
380
381/// A token that keeps a sink alive.
382pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
383
384impl SinkToken {
385    /// Create a new `SinkToken`.
386    pub fn new(t: Box<dyn Any>) -> Self {
387        Self(t)
388    }
389}
390
391impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
392    /// Entrypoint for applying a compute command.
393    #[mz_ore::instrument(level = "debug")]
394    pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
395        use ComputeCommand::*;
396
397        self.compute_state.command_history.push(cmd.clone());
398
399        // Record the command duration, per worker and command kind.
400        let timer = self
401            .compute_state
402            .metrics
403            .handle_command_duration_seconds
404            .for_command(&cmd)
405            .start_timer();
406
407        match cmd {
408            Hello { .. } => panic!("Hello must be captured before"),
409            CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
410            InitializationComplete => (),
411            UpdateConfiguration(params) => self.handle_update_configuration(*params),
412            CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
413            Schedule(id) => self.handle_schedule(id),
414            AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
415            Peek(peek) => {
416                peek.otel_ctx.attach_as_parent();
417                self.handle_peek(*peek)
418            }
419            CancelPeek { uuid } => self.handle_cancel_peek(uuid),
420            AllowWrites => {
421                self.compute_state
422                    .read_only_tx
423                    .send(false)
424                    .expect("we're holding one other end");
425                self.compute_state.persist_clients.cfg().enable_compaction();
426            }
427        }
428
429        timer.observe_duration();
430    }
431
432    fn handle_create_instance(&mut self, config: InstanceConfig) {
433        // Ensure the state is consistent with the config before we initialize anything.
434        self.compute_state.apply_worker_config();
435        if let Some(offset) = config.expiration_offset {
436            self.compute_state.apply_expiration_offset(offset);
437        }
438
439        self.initialize_logging(config.logging);
440
441        self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
442    }
443
444    fn handle_update_configuration(&mut self, params: ComputeParameters) {
445        info!("Applying configuration update: {params:?}");
446
447        let ComputeParameters {
448            workload_class,
449            max_result_size,
450            tracing,
451            grpc_client: _grpc_client,
452            dyncfg_updates,
453        } = params;
454
455        if let Some(v) = workload_class {
456            self.compute_state.metrics.set_workload_class(v);
457        }
458        if let Some(v) = max_result_size {
459            self.compute_state.max_result_size = v;
460        }
461
462        tracing.apply(self.compute_state.tracing_handle.as_ref());
463
464        dyncfg_updates.apply(&self.compute_state.worker_config);
465        self.compute_state
466            .persist_clients
467            .cfg()
468            .apply_from(&dyncfg_updates);
469
470        // Note: We're only updating mz_metrics from the compute state here, but not from the
471        // equivalent storage state. This is because they're running on the same process and
472        // share the metrics.
473        mz_metrics::update_dyncfg(&dyncfg_updates);
474
475        self.compute_state.apply_worker_config();
476    }
477
478    fn handle_create_dataflow(
479        &mut self,
480        dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
481    ) {
482        let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
483        let as_of = dataflow.as_of.clone().unwrap();
484
485        let dataflow_expiration = dataflow
486            .time_dependence
487            .as_ref()
488            .map(|time_dependence| {
489                self.determine_dataflow_expiration(time_dependence, &dataflow.until)
490            })
491            .unwrap_or_default();
492
493        // Add the dataflow expiration to `until`.
494        let until = dataflow.until.meet(&dataflow_expiration);
495
496        if dataflow.is_transient() {
497            debug!(
498                name = %dataflow.debug_name,
499                import_ids = %dataflow.display_import_ids(),
500                export_ids = %dataflow.display_export_ids(),
501                as_of = ?as_of.elements(),
502                time_dependence = ?dataflow.time_dependence,
503                expiration = ?dataflow_expiration.elements(),
504                expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
505                plan_until = ?dataflow.until.elements(),
506                until = ?until.elements(),
507                "creating dataflow",
508            );
509        } else {
510            info!(
511                name = %dataflow.debug_name,
512                import_ids = %dataflow.display_import_ids(),
513                export_ids = %dataflow.display_export_ids(),
514                as_of = ?as_of.elements(),
515                time_dependence = ?dataflow.time_dependence,
516                expiration = ?dataflow_expiration.elements(),
517                expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
518                plan_until = ?dataflow.until.elements(),
519                until = ?until.elements(),
520                "creating dataflow",
521            );
522        };
523
524        let subscribe_copy_ids: BTreeSet<_> = dataflow
525            .subscribe_ids()
526            .chain(dataflow.copy_to_ids())
527            .collect();
528
529        // Initialize compute and logging state for each object.
530        for object_id in dataflow.export_ids() {
531            let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
532            let metrics = self.compute_state.metrics.for_collection(object_id);
533            let mut collection = CollectionState::new(
534                Rc::clone(&dataflow_index),
535                is_subscribe_or_copy,
536                as_of.clone(),
537                metrics,
538            );
539
540            if let Some(logger) = self.compute_state.compute_logger.clone() {
541                let logging = CollectionLogging::new(
542                    object_id,
543                    logger,
544                    *dataflow_index,
545                    dataflow.import_ids(),
546                );
547                collection.logging = Some(logging);
548            }
549
550            collection.reset_reported_frontiers(ReportedFrontier::NotReported {
551                lower: as_of.clone(),
552            });
553
554            let existing = self.compute_state.collections.insert(object_id, collection);
555            if existing.is_some() {
556                error!(
557                    id = ?object_id,
558                    "existing collection for newly created dataflow",
559                );
560            }
561        }
562
563        let (start_signal, suspension_token) = StartSignal::new();
564        for id in dataflow.export_ids() {
565            self.compute_state
566                .suspended_collections
567                .insert(id, Rc::clone(&suspension_token));
568        }
569
570        crate::render::build_compute_dataflow(
571            self.timely_worker,
572            self.compute_state,
573            dataflow,
574            start_signal,
575            until,
576            dataflow_expiration,
577        );
578    }
579
580    fn handle_schedule(&mut self, id: GlobalId) {
581        // A `Schedule` command instructs us to begin dataflow computation for a collection, so
582        // we should unsuspend it by dropping the corresponding suspension token. Note that a
583        // dataflow can export multiple collections and they all share one suspension token, so the
584        // computation of a dataflow will only start once all its exported collections have been
585        // scheduled.
586        let suspension_token = self.compute_state.suspended_collections.remove(&id);
587        drop(suspension_token);
588    }
589
590    fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
591        if frontier.is_empty() {
592            // Indicates that we may drop `id`, as there are no more valid times to read.
593            self.drop_collection(id);
594        } else {
595            self.compute_state
596                .traces
597                .allow_compaction(id, frontier.borrow());
598        }
599    }
600
601    #[mz_ore::instrument(level = "debug")]
602    fn handle_peek(&mut self, peek: Peek) {
603        let pending = match &peek.target {
604            PeekTarget::Index { id } => {
605                // Acquire a copy of the trace suitable for fulfilling the peek.
606                let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
607                PendingPeek::index(peek, trace_bundle)
608            }
609            PeekTarget::Persist { metadata, .. } => {
610                let metadata = metadata.clone();
611                PendingPeek::persist(
612                    peek,
613                    Arc::clone(&self.compute_state.persist_clients),
614                    metadata,
615                    usize::cast_from(self.compute_state.max_result_size),
616                    self.timely_worker,
617                )
618            }
619        };
620
621        // Log the receipt of the peek.
622        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
623            logger.log(&pending.as_log_event(true));
624        }
625
626        self.process_peek(&mut Antichain::new(), pending);
627    }
628
629    fn handle_cancel_peek(&mut self, uuid: Uuid) {
630        if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
631            self.send_peek_response(peek, PeekResponse::Canceled);
632        }
633    }
634
635    /// Arrange for the given collection to be dropped.
636    ///
637    /// Collection dropping occurs in three phases:
638    ///
639    ///  1. This method removes the collection from the [`ComputeState`] and drops its
640    ///     [`CollectionState`], including its held dataflow tokens. It then adds the dropped
641    ///     collection to `dropped_collections`.
642    ///  2. The next step of the Timely worker lets the source operators observe the token drops
643    ///     and shut themselves down.
644    ///  3. `report_dropped_collections` removes the entry from `dropped_collections` and emits any
645    ///     outstanding final responses required by the compute protocol.
646    ///
647    /// These steps ensure that we don't report a collection as dropped to the controller before it
648    /// has stopped reading from its inputs. Doing so would allow the controller to release its
649    /// read holds on the inputs, which could lead to panics from the replica trying to read
650    /// already compacted times.
651    fn drop_collection(&mut self, id: GlobalId) {
652        let collection = self
653            .compute_state
654            .collections
655            .remove(&id)
656            .expect("dropped untracked collection");
657
658        // If this collection is an index, remove its trace.
659        self.compute_state.traces.remove(&id);
660        // If the collection is unscheduled, remove it from the list of waiting collections.
661        self.compute_state.suspended_collections.remove(&id);
662
663        // Drop the dataflow, if all its exports have been dropped.
664        if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
665            self.timely_worker.drop_dataflow(index);
666        }
667
668        // Remember the collection as dropped, for emission of outstanding final compute responses.
669        let dropped = DroppedCollection {
670            reported_frontiers: collection.reported_frontiers,
671            is_subscribe_or_copy: collection.is_subscribe_or_copy,
672        };
673        self.compute_state.dropped_collections.push((id, dropped));
674    }
675
676    /// Initializes timely dataflow logging and publishes as a view.
677    pub fn initialize_logging(&mut self, config: LoggingConfig) {
678        if self.compute_state.compute_logger.is_some() {
679            panic!("dataflow server has already initialized logging");
680        }
681
682        let LoggingTraces {
683            traces,
684            dataflow_index,
685            compute_logger: logger,
686        } = logging::initialize(self.timely_worker, &config);
687
688        let dataflow_index = Rc::new(dataflow_index);
689        let mut log_index_ids = config.index_logs;
690        for (log, trace) in traces {
691            // Install trace as maintained index.
692            let id = log_index_ids
693                .remove(&log)
694                .expect("`logging::initialize` does not invent logs");
695            self.compute_state.traces.set(id, trace);
696
697            // Initialize compute and logging state for the logging index.
698            let is_subscribe_or_copy = false;
699            let as_of = Antichain::from_elem(Timestamp::MIN);
700            let metrics = self.compute_state.metrics.for_collection(id);
701            let mut collection = CollectionState::new(
702                Rc::clone(&dataflow_index),
703                is_subscribe_or_copy,
704                as_of,
705                metrics,
706            );
707
708            let logging =
709                CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
710            collection.logging = Some(logging);
711
712            let existing = self.compute_state.collections.insert(id, collection);
713            if existing.is_some() {
714                error!(
715                    id = ?id,
716                    "existing collection for newly initialized logging export",
717                );
718            }
719        }
720
721        // Sanity check.
722        assert!(
723            log_index_ids.is_empty(),
724            "failed to create requested logging indexes: {log_index_ids:?}",
725        );
726
727        self.compute_state.compute_logger = Some(logger);
728    }
729
730    /// Send progress information to the controller.
731    pub fn report_frontiers(&mut self) {
732        let mut responses = Vec::new();
733
734        // Maintain a single allocation for `new_frontier` to avoid allocating on every iteration.
735        let mut new_frontier = Antichain::new();
736
737        for (&id, collection) in self.compute_state.collections.iter_mut() {
738            // The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
739            // collections (database-issues#4701).
740            if collection.is_subscribe_or_copy {
741                continue;
742            }
743
744            let reported = collection.reported_frontiers();
745
746            // Collect the write frontier and check for progress.
747            new_frontier.clear();
748            if let Some(traces) = self.compute_state.traces.get_mut(&id) {
749                assert!(
750                    collection.sink_write_frontier.is_none(),
751                    "collection {id} has multiple frontiers"
752                );
753                traces.oks_mut().read_upper(&mut new_frontier);
754            } else if let Some(frontier) = &collection.sink_write_frontier {
755                new_frontier.clone_from(&frontier.borrow());
756            } else {
757                error!(id = ?id, "collection without write frontier");
758                continue;
759            }
760            let new_write_frontier = reported
761                .write_frontier
762                .allows_reporting(&new_frontier)
763                .then(|| new_frontier.clone());
764
765            // Collect the output frontier and check for progress.
766            //
767            // By default, the output frontier equals the write frontier (which is still stored in
768            // `new_frontier`). If the collection provides a compute frontier, we construct the
769            // output frontier by taking the meet of write and compute frontier, to avoid:
770            //  * reporting progress through times we have not yet written
771            //  * reporting progress through times we have not yet fully processed, for
772            //    collections that jump their write frontiers into the future
773            if let Some(probe) = &collection.compute_probe {
774                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
775            }
776            let new_output_frontier = reported
777                .output_frontier
778                .allows_reporting(&new_frontier)
779                .then(|| new_frontier.clone());
780
781            // Collect the input frontier and check for progress.
782            new_frontier.clear();
783            for probe in collection.input_probes.values() {
784                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
785            }
786            let new_input_frontier = reported
787                .input_frontier
788                .allows_reporting(&new_frontier)
789                .then(|| new_frontier.clone());
790
791            if let Some(frontier) = &new_write_frontier {
792                collection
793                    .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
794            }
795            if let Some(frontier) = &new_input_frontier {
796                collection
797                    .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
798            }
799            if let Some(frontier) = &new_output_frontier {
800                collection
801                    .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
802            }
803
804            let response = FrontiersResponse {
805                write_frontier: new_write_frontier,
806                input_frontier: new_input_frontier,
807                output_frontier: new_output_frontier,
808            };
809            if response.has_updates() {
810                responses.push((id, response));
811            }
812        }
813
814        for (id, frontiers) in responses {
815            self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
816        }
817    }
818
819    /// Report dropped collections to the controller.
820    pub fn report_dropped_collections(&mut self) {
821        let dropped_collections = std::mem::take(&mut self.compute_state.dropped_collections);
822
823        for (id, collection) in dropped_collections {
824            // The compute protocol requires us to send a `Frontiers` response with empty frontiers
825            // when a collection was dropped, unless:
826            //  * The frontier was already reported as empty previously, or
827            //  * The collection is a subscribe or copy-to.
828
829            if collection.is_subscribe_or_copy {
830                continue;
831            }
832
833            let reported = collection.reported_frontiers;
834            let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
835            let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
836            let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
837
838            let frontiers = FrontiersResponse {
839                write_frontier,
840                input_frontier,
841                output_frontier,
842            };
843            if frontiers.has_updates() {
844                self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
845            }
846        }
847    }
848
849    /// Report per-worker metrics.
850    pub(crate) fn report_metrics(&self) {
851        if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
852            let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
853            let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
854            let remaining = expiration - now;
855            self.compute_state
856                .metrics
857                .replica_expiration_remaining_seconds
858                .set(remaining)
859        }
860    }
861
862    /// Either complete the peek (and send the response) or put it in the pending set.
863    fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
864        let response = match &mut peek {
865            PendingPeek::Index(peek) => {
866                let peek_stash_eligible = peek
867                    .peek
868                    .finishing
869                    .is_streamable(peek.peek.result_desc.arity());
870
871                let peek_stash_enabled = {
872                    let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
873                    let peek_persist_stash_available =
874                        self.compute_state.peek_stash_persist_location.is_some();
875                    if !peek_persist_stash_available && enabled {
876                        tracing::error!(
877                            "missing peek_stash_persist_location but peek stash is enabled"
878                        );
879                    }
880                    enabled && peek_persist_stash_available
881                };
882
883                let peek_stash_threshold_bytes =
884                    PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
885
886                match peek.seek_fulfillment(
887                    upper,
888                    self.compute_state.max_result_size,
889                    peek_stash_enabled && peek_stash_eligible,
890                    peek_stash_threshold_bytes,
891                ) {
892                    PeekStatus::Ready(result) => Some(result),
893                    PeekStatus::NotReady => None,
894                    PeekStatus::UsePeekStash => {
895                        let _span =
896                            span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
897
898                        let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
899                            .get(&self.compute_state.worker_config);
900
901                        let stash_task = peek_stash::StashingPeek::start_upload(
902                            Arc::clone(&self.compute_state.persist_clients),
903                            self.compute_state
904                                .peek_stash_persist_location
905                                .as_ref()
906                                .expect("verified above"),
907                            peek.peek.clone(),
908                            peek.trace_bundle.clone(),
909                            peek_stash_batch_max_runs,
910                        );
911
912                        self.compute_state
913                            .pending_peeks
914                            .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
915                        return;
916                    }
917                }
918            }
919            PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
920                self.compute_state
921                    .metrics
922                    .persist_peek_seconds
923                    .observe(duration.as_secs_f64());
924                result
925            }),
926            PendingPeek::Stash(stashing_peek) => {
927                let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
928                let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
929                stashing_peek.pump_rows(num_batches, batch_size);
930
931                if let Ok((response, duration)) = stashing_peek.result.try_recv() {
932                    self.compute_state
933                        .metrics
934                        .stashed_peek_seconds
935                        .observe(duration.as_secs_f64());
936                    tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
937
938                    Some(response)
939                } else {
940                    None
941                }
942            }
943        };
944
945        if let Some(response) = response {
946            let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
947            self.send_peek_response(peek, response)
948        } else {
949            let uuid = peek.peek().uuid;
950            self.compute_state.pending_peeks.insert(uuid, peek);
951        }
952    }
953
954    /// Scan pending peeks and attempt to retire each.
955    pub fn process_peeks(&mut self) {
956        let mut upper = Antichain::new();
957        let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
958        for (_uuid, peek) in pending_peeks {
959            self.process_peek(&mut upper, peek);
960        }
961    }
962
963    /// Sends a response for this peek's resolution to the coordinator.
964    ///
965    /// Note that this function takes ownership of the `PendingPeek`, which is
966    /// meant to prevent multiple responses to the same peek.
967    #[mz_ore::instrument(level = "debug")]
968    fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
969        let log_event = peek.as_log_event(false);
970        // Respond with the response.
971        self.send_compute_response(ComputeResponse::PeekResponse(
972            peek.peek().uuid,
973            response,
974            OpenTelemetryContext::obtain(),
975        ));
976
977        // Log responding to the peek request.
978        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
979            logger.log(&log_event);
980        }
981    }
982
983    /// Scan the shared subscribe response buffer, and forward results along.
984    pub fn process_subscribes(&mut self) {
985        let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
986        for (sink_id, mut response) in subscribe_responses.drain(..) {
987            // Update frontier logging for this subscribe.
988            if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
989                let new_frontier = match &response {
990                    SubscribeResponse::Batch(b) => b.upper.clone(),
991                    SubscribeResponse::DroppedAt(_) => Antichain::new(),
992                };
993
994                let reported = collection.reported_frontiers();
995                assert!(
996                    reported.write_frontier.allows_reporting(&new_frontier),
997                    "subscribe write frontier regression: {:?} -> {:?}",
998                    reported.write_frontier,
999                    new_frontier,
1000                );
1001                assert!(
1002                    reported.input_frontier.allows_reporting(&new_frontier),
1003                    "subscribe input frontier regression: {:?} -> {:?}",
1004                    reported.input_frontier,
1005                    new_frontier,
1006                );
1007
1008                collection
1009                    .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1010                collection
1011                    .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1012                collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1013            } else {
1014                // Presumably tracking state for this subscribe was already dropped by
1015                // `drop_collection`. There is nothing left to do for logging.
1016            }
1017
1018            response
1019                .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1020            self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1021        }
1022    }
1023
1024    /// Scan the shared copy to response buffer, and forward results along.
1025    pub fn process_copy_tos(&self) {
1026        let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1027        for (sink_id, response) in responses.drain(..) {
1028            self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1029        }
1030    }
1031
1032    /// Send a response to the coordinator.
1033    fn send_compute_response(&self, response: ComputeResponse) {
1034        // Ignore send errors because the coordinator is free to ignore our
1035        // responses. This happens during shutdown.
1036        let _ = self.response_tx.send(response);
1037    }
1038
1039    /// Checks for dataflow expiration. Panics if we're past the replica expiration time.
1040    pub(crate) fn check_expiration(&self) {
1041        let now = mz_ore::now::SYSTEM_TIME();
1042        if self.compute_state.replica_expiration.less_than(&now.into()) {
1043            let now_datetime = mz_ore::now::to_datetime(now);
1044            let expiration_datetime = self
1045                .compute_state
1046                .replica_expiration
1047                .as_option()
1048                .map(Into::into)
1049                .map(mz_ore::now::to_datetime);
1050
1051            // We error and assert separately to produce structured logs in anything that depends
1052            // on tracing.
1053            error!(
1054                now,
1055                now_datetime = ?now_datetime,
1056                expiration = ?self.compute_state.replica_expiration.elements(),
1057                expiration_datetime = ?expiration_datetime,
1058                "replica expired"
1059            );
1060
1061            // Repeat condition for better error message.
1062            assert!(
1063                !self.compute_state.replica_expiration.less_than(&now.into()),
1064                "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1065                self.compute_state.replica_expiration.elements(),
1066            );
1067        }
1068    }
1069
1070    /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be
1071    /// dropped.
1072    ///
1073    /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under
1074    /// which dataflow expiration should be disabled.
1075    pub fn determine_dataflow_expiration(
1076        &self,
1077        time_dependence: &TimeDependence,
1078        until: &Antichain<mz_repr::Timestamp>,
1079    ) -> Antichain<mz_repr::Timestamp> {
1080        // Evaluate time dependence with respect to the expiration time.
1081        // * Step time forward to ensure the expiration time is different to the moment a dataflow
1082        //   can legitimately jump to.
1083        // * We cannot expire dataflow with an until that is less or equal to the expiration time.
1084        let iter = self
1085            .compute_state
1086            .replica_expiration
1087            .iter()
1088            .filter_map(|t| time_dependence.apply(*t))
1089            .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1090            .filter(|expiration| !until.less_equal(expiration));
1091        Antichain::from_iter(iter)
1092    }
1093}
1094
1095/// A peek against either an index or a Persist collection.
1096///
1097/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1098/// as each `PendingPeek` is meant to be dropped after it's responded to.
1099pub enum PendingPeek {
1100    /// A peek against an index. (Possibly a temporary index created for the purpose.)
1101    Index(IndexPeek),
1102    /// A peek against a Persist-backed collection.
1103    Persist(PersistPeek),
1104    /// A peek against an index that is being stashed in the peek stash by an
1105    /// async background task.
1106    Stash(peek_stash::StashingPeek),
1107}
1108
1109impl PendingPeek {
1110    /// Produces a corresponding log event.
1111    pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1112        let peek = self.peek();
1113        let (id, peek_type) = match &peek.target {
1114            PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1115            PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1116        };
1117        let uuid = peek.uuid.into_bytes();
1118        ComputeEvent::Peek(PeekEvent {
1119            id,
1120            time: peek.timestamp,
1121            uuid,
1122            peek_type,
1123            installed,
1124        })
1125    }
1126
1127    fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1128        let empty_frontier = Antichain::new();
1129        let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1130        trace_bundle
1131            .oks_mut()
1132            .set_logical_compaction(timestamp_frontier.borrow());
1133        trace_bundle
1134            .errs_mut()
1135            .set_logical_compaction(timestamp_frontier.borrow());
1136        trace_bundle
1137            .oks_mut()
1138            .set_physical_compaction(empty_frontier.borrow());
1139        trace_bundle
1140            .errs_mut()
1141            .set_physical_compaction(empty_frontier.borrow());
1142
1143        PendingPeek::Index(IndexPeek {
1144            peek,
1145            trace_bundle,
1146            span: tracing::Span::current(),
1147        })
1148    }
1149
1150    fn persist<A: Allocate>(
1151        peek: Peek,
1152        persist_clients: Arc<PersistClientCache>,
1153        metadata: CollectionMetadata,
1154        max_result_size: usize,
1155        timely_worker: &TimelyWorker<A>,
1156    ) -> Self {
1157        let active_worker = {
1158            // Choose the worker that does the actual peek arbitrarily but consistently.
1159            let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1160            chosen_index == timely_worker.index()
1161        };
1162        let activator = timely_worker.sync_activator_for([].into());
1163        let peek_uuid = peek.uuid;
1164
1165        let (result_tx, result_rx) = oneshot::channel();
1166        let timestamp = peek.timestamp;
1167        let mfp_plan = peek.map_filter_project.clone();
1168        let max_results_needed = peek
1169            .finishing
1170            .limit
1171            .map(|l| usize::cast_from(u64::from(l)))
1172            .unwrap_or(usize::MAX)
1173            + peek.finishing.offset;
1174        let order_by = peek.finishing.order_by.clone();
1175
1176        // Persist peeks can include at most one literal constraint.
1177        let literal_constraint = peek
1178            .literal_constraints
1179            .clone()
1180            .map(|rows| rows.into_element());
1181
1182        let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1183            let start = Instant::now();
1184            let result = if active_worker {
1185                PersistPeek::do_peek(
1186                    &persist_clients,
1187                    metadata,
1188                    timestamp,
1189                    literal_constraint,
1190                    mfp_plan,
1191                    max_result_size,
1192                    max_results_needed,
1193                )
1194                .await
1195            } else {
1196                Ok(vec![])
1197            };
1198            let result = match result {
1199                Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1200                Err(e) => PeekResponse::Error(e.to_string()),
1201            };
1202            match result_tx.send((result, start.elapsed())) {
1203                Ok(()) => {}
1204                Err((_result, elapsed)) => {
1205                    debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1206                }
1207            }
1208            match activator.activate() {
1209                Ok(()) => {}
1210                Err(_) => {
1211                    debug!("unable to wake timely after completed peek {peek_uuid}");
1212                }
1213            }
1214        });
1215        PendingPeek::Persist(PersistPeek {
1216            peek,
1217            _abort_handle: task_handle.abort_on_drop(),
1218            result: result_rx,
1219            span: tracing::Span::current(),
1220        })
1221    }
1222
1223    fn span(&self) -> &tracing::Span {
1224        match self {
1225            PendingPeek::Index(p) => &p.span,
1226            PendingPeek::Persist(p) => &p.span,
1227            PendingPeek::Stash(p) => &p.span,
1228        }
1229    }
1230
1231    pub(crate) fn peek(&self) -> &Peek {
1232        match self {
1233            PendingPeek::Index(p) => &p.peek,
1234            PendingPeek::Persist(p) => &p.peek,
1235            PendingPeek::Stash(p) => &p.peek,
1236        }
1237    }
1238}
1239
1240/// An in-progress Persist peek.
1241///
1242/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1243/// as each `PendingPeek` is meant to be dropped after it's responded to.
1244pub struct PersistPeek {
1245    pub(crate) peek: Peek,
1246    /// A background task that's responsible for producing the peek results.
1247    /// If we're no longer interested in the results, we abort the task.
1248    _abort_handle: AbortOnDropHandle<()>,
1249    /// The result of the background task, eventually.
1250    result: oneshot::Receiver<(PeekResponse, Duration)>,
1251    /// The `tracing::Span` tracking this peek's operation
1252    span: tracing::Span,
1253}
1254
1255impl PersistPeek {
1256    async fn do_peek(
1257        persist_clients: &PersistClientCache,
1258        metadata: CollectionMetadata,
1259        as_of: Timestamp,
1260        literal_constraint: Option<Row>,
1261        mfp_plan: SafeMfpPlan,
1262        max_result_size: usize,
1263        mut limit_remaining: usize,
1264    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1265        let client = persist_clients
1266            .open(metadata.persist_location)
1267            .await
1268            .map_err(|e| e.to_string())?;
1269
1270        let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1271            .open_leased_reader(
1272                metadata.data_shard,
1273                Arc::new(metadata.relation_desc.clone()),
1274                Arc::new(UnitSchema),
1275                Diagnostics::from_purpose("persist::peek"),
1276                USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1277            )
1278            .await
1279            .map_err(|e| e.to_string())?;
1280
1281        // If we are using txn-wal for this collection, then the upper might
1282        // be advanced lazily and we have to go through txn-wal for reads.
1283        //
1284        // TODO: If/when we have a process-wide TxnsRead worker for clusterd,
1285        // use in here (instead of opening a new TxnsCache) to save a persist
1286        // reader registration and some txns shard read traffic.
1287        let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1288            Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1289        } else {
1290            None
1291        };
1292
1293        let metrics = client.metrics();
1294
1295        let mut cursor = StatsCursor::new(
1296            &mut reader,
1297            txns_read.as_mut(),
1298            metrics,
1299            &mfp_plan,
1300            &metadata.relation_desc,
1301            Antichain::from_elem(as_of),
1302        )
1303        .await
1304        .map_err(|since| {
1305            format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1306        })?;
1307
1308        // Re-used state for processing and building rows.
1309        let mut result = vec![];
1310        let mut datum_vec = DatumVec::new();
1311        let mut row_builder = Row::default();
1312        let arena = RowArena::new();
1313        let mut total_size = 0usize;
1314
1315        let literal_len = match &literal_constraint {
1316            None => 0,
1317            Some(row) => row.iter().count(),
1318        };
1319
1320        'collect: while limit_remaining > 0 {
1321            let Some(batch) = cursor.next().await else {
1322                break;
1323            };
1324            for (data, _, d) in batch {
1325                let row = data.map_err(|e| e.to_string())?;
1326
1327                if let Some(literal) = &literal_constraint {
1328                    match row.iter().take(literal_len).cmp(literal.iter()) {
1329                        Ordering::Less => continue,
1330                        Ordering::Equal => {}
1331                        Ordering::Greater => break 'collect,
1332                    }
1333                }
1334
1335                let count: usize = d.try_into().map_err(|_| {
1336                    tracing::error!(
1337                        shard = %metadata.data_shard, diff = d, ?row,
1338                        "persist peek encountered negative multiplicities",
1339                    );
1340                    format!(
1341                        "Invalid data in source, \
1342                         saw retractions ({}) for row that does not exist: {:?}",
1343                        -d, row,
1344                    )
1345                })?;
1346                let Some(count) = NonZeroUsize::new(count) else {
1347                    continue;
1348                };
1349                let mut datum_local = datum_vec.borrow_with(&row);
1350                let eval_result = mfp_plan
1351                    .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1352                    .map(|row| row.cloned())
1353                    .map_err(|e| e.to_string())?;
1354                if let Some(row) = eval_result {
1355                    total_size = total_size
1356                        .saturating_add(row.byte_len())
1357                        .saturating_add(std::mem::size_of::<NonZeroUsize>());
1358                    if total_size > max_result_size {
1359                        return Err(format!(
1360                            "result exceeds max size of {}",
1361                            ByteSize::b(u64::cast_from(max_result_size))
1362                        ));
1363                    }
1364                    result.push((row, count));
1365                    limit_remaining = limit_remaining.saturating_sub(count.get());
1366                    if limit_remaining == 0 {
1367                        break;
1368                    }
1369                }
1370            }
1371        }
1372
1373        Ok(result)
1374    }
1375}
1376
1377/// An in-progress index-backed peek, and data to eventually fulfill it.
1378pub struct IndexPeek {
1379    peek: Peek,
1380    /// The data from which the trace derives.
1381    trace_bundle: TraceBundle,
1382    /// The `tracing::Span` tracking this peek's operation
1383    span: tracing::Span,
1384}
1385
1386impl IndexPeek {
1387    /// Attempts to fulfill the peek and reports success.
1388    ///
1389    /// To produce output at `peek.timestamp`, we must be certain that
1390    /// it is no longer changing. A trace guarantees that all future
1391    /// changes will be greater than or equal to an element of `upper`.
1392    ///
1393    /// If an element of `upper` is less or equal to `peek.timestamp`,
1394    /// then there can be further updates that would change the output.
1395    /// If no element of `upper` is less or equal to `peek.timestamp`,
1396    /// then for any time `t` less or equal to `peek.timestamp` it is
1397    /// not the case that `upper` is less or equal to that timestamp,
1398    /// and so the result cannot further evolve.
1399    fn seek_fulfillment(
1400        &mut self,
1401        upper: &mut Antichain<Timestamp>,
1402        max_result_size: u64,
1403        peek_stash_eligible: bool,
1404        peek_stash_threshold_bytes: usize,
1405    ) -> PeekStatus {
1406        self.trace_bundle.oks_mut().read_upper(upper);
1407        if upper.less_equal(&self.peek.timestamp) {
1408            return PeekStatus::NotReady;
1409        }
1410        self.trace_bundle.errs_mut().read_upper(upper);
1411        if upper.less_equal(&self.peek.timestamp) {
1412            return PeekStatus::NotReady;
1413        }
1414
1415        let read_frontier = self.trace_bundle.compaction_frontier();
1416        if !read_frontier.less_equal(&self.peek.timestamp) {
1417            let error = format!(
1418                "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1419                read_frontier.elements(),
1420                self.peek.timestamp,
1421            );
1422            return PeekStatus::Ready(PeekResponse::Error(error));
1423        }
1424
1425        self.collect_finished_data(
1426            max_result_size,
1427            peek_stash_eligible,
1428            peek_stash_threshold_bytes,
1429        )
1430    }
1431
1432    /// Collects data for a known-complete peek from the ok stream.
1433    fn collect_finished_data(
1434        &mut self,
1435        max_result_size: u64,
1436        peek_stash_eligible: bool,
1437        peek_stash_threshold_bytes: usize,
1438    ) -> PeekStatus {
1439        // Check if there exist any errors and, if so, return whatever one we
1440        // find first.
1441        let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1442        while cursor.key_valid(&storage) {
1443            let mut copies = Diff::ZERO;
1444            cursor.map_times(&storage, |time, diff| {
1445                if time.less_equal(&self.peek.timestamp) {
1446                    copies += diff;
1447                }
1448            });
1449            if copies.is_negative() {
1450                let error = cursor.key(&storage);
1451                tracing::error!(
1452                    target = %self.peek.target.id(), diff = %copies, %error,
1453                    "index peek encountered negative multiplicities in error trace",
1454                );
1455                return PeekStatus::Ready(PeekResponse::Error(format!(
1456                    "Invalid data in source errors, \
1457                    saw retractions ({}) for row that does not exist: {}",
1458                    -copies, error,
1459                )));
1460            }
1461            if copies.is_positive() {
1462                return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1463            }
1464            cursor.step_key(&storage);
1465        }
1466
1467        Self::collect_ok_finished_data(
1468            &self.peek,
1469            self.trace_bundle.oks_mut(),
1470            max_result_size,
1471            peek_stash_eligible,
1472            peek_stash_threshold_bytes,
1473        )
1474    }
1475
1476    /// Collects data for a known-complete peek from the ok stream.
1477    fn collect_ok_finished_data<Tr>(
1478        peek: &Peek<Timestamp>,
1479        oks_handle: &mut Tr,
1480        max_result_size: u64,
1481        peek_stash_eligible: bool,
1482        peek_stash_threshold_bytes: usize,
1483    ) -> PeekStatus
1484    where
1485        for<'a> Tr: TraceReader<
1486                Key<'a>: ToDatumIter + Eq,
1487                KeyOwn = Row,
1488                Val<'a>: ToDatumIter,
1489                TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1490                DiffGat<'a> = &'a Diff,
1491            >,
1492    {
1493        let max_result_size = usize::cast_from(max_result_size);
1494        let count_byte_size = size_of::<NonZeroUsize>();
1495
1496        // We clone `literal_constraints` here because we don't want to move the constraints
1497        // out of the peek struct, and don't want to modify in-place.
1498        let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1499            peek.target.id().clone(),
1500            peek.map_filter_project.clone(),
1501            peek.timestamp,
1502            peek.literal_constraints.clone().as_deref_mut(),
1503            oks_handle,
1504        );
1505
1506        // Accumulated `Vec<(row, count)>` results that we are likely to return.
1507        let mut results = Vec::new();
1508        let mut total_size: usize = 0;
1509
1510        // When set, a bound on the number of records we need to return.
1511        // The requirements on the records are driven by the finishing's
1512        // `order_by` field. Further limiting will happen when the results
1513        // are collected, so we don't need to have exactly this many results,
1514        // just at least those results that would have been returned.
1515        let max_results = peek.finishing.num_rows_needed();
1516
1517        let mut l_datum_vec = DatumVec::new();
1518        let mut r_datum_vec = DatumVec::new();
1519
1520        while let Some(row) = peek_iterator.next() {
1521            let row = match row {
1522                Ok(row) => row,
1523                Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1524            };
1525            let (row, copies) = row;
1526            let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1527
1528            total_size = total_size
1529                .saturating_add(row.byte_len())
1530                .saturating_add(count_byte_size);
1531            if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1532                return PeekStatus::UsePeekStash;
1533            }
1534            if total_size > max_result_size {
1535                return PeekStatus::Ready(PeekResponse::Error(format!(
1536                    "result exceeds max size of {}",
1537                    ByteSize::b(u64::cast_from(max_result_size))
1538                )));
1539            }
1540
1541            results.push((row, copies));
1542
1543            // If we hold many more than `max_results` records, we can thin down
1544            // `results` using `self.finishing.ordering`.
1545            if let Some(max_results) = max_results {
1546                // We use a threshold twice what we intend, to amortize the work
1547                // across all of the insertions. We could tighten this, but it
1548                // works for the moment.
1549                if results.len() >= 2 * max_results {
1550                    if peek.finishing.order_by.is_empty() {
1551                        results.truncate(max_results);
1552                        return PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1553                            results,
1554                            &peek.finishing.order_by,
1555                        )));
1556                    } else {
1557                        // We can sort `results` and then truncate to `max_results`.
1558                        // This has an effect similar to a priority queue, without
1559                        // its interactive dequeueing properties.
1560                        // TODO: Had we left these as `Vec<Datum>` we would avoid
1561                        // the unpacking; we should consider doing that, although
1562                        // it will require a re-pivot of the code to branch on this
1563                        // inner test (as we prefer not to maintain `Vec<Datum>`
1564                        // in the other case).
1565                        results.sort_by(|left, right| {
1566                            let left_datums = l_datum_vec.borrow_with(&left.0);
1567                            let right_datums = r_datum_vec.borrow_with(&right.0);
1568                            mz_expr::compare_columns(
1569                                &peek.finishing.order_by,
1570                                &left_datums,
1571                                &right_datums,
1572                                || left.0.cmp(&right.0),
1573                            )
1574                        });
1575                        let dropped = results.drain(max_results..);
1576                        let dropped_size =
1577                            dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1578                                acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1579                            });
1580                        total_size = total_size.saturating_sub(dropped_size);
1581                    }
1582                }
1583            }
1584        }
1585
1586        PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1587            results,
1588            &peek.finishing.order_by,
1589        )))
1590    }
1591}
1592
1593/// For keeping track of the state of pending or ready peeks, and managing
1594/// control flow.
1595enum PeekStatus {
1596    /// The frontiers of objects are not yet advanced enough, peek is still
1597    /// pending.
1598    NotReady,
1599    /// The result size is above the configured threshold and the peek is
1600    /// eligible for using the peek result stash.
1601    UsePeekStash,
1602    /// The peek result is ready.
1603    Ready(PeekResponse),
1604}
1605
1606/// The frontiers we have reported to the controller for a collection.
1607#[derive(Debug)]
1608struct ReportedFrontiers {
1609    /// The reported write frontier.
1610    write_frontier: ReportedFrontier,
1611    /// The reported input frontier.
1612    input_frontier: ReportedFrontier,
1613    /// The reported output frontier.
1614    output_frontier: ReportedFrontier,
1615}
1616
1617impl ReportedFrontiers {
1618    /// Creates a new `ReportedFrontiers` instance.
1619    fn new() -> Self {
1620        Self {
1621            write_frontier: ReportedFrontier::new(),
1622            input_frontier: ReportedFrontier::new(),
1623            output_frontier: ReportedFrontier::new(),
1624        }
1625    }
1626}
1627
1628/// A frontier we have reported to the controller, or the least frontier we are allowed to report.
1629#[derive(Clone, Debug)]
1630pub enum ReportedFrontier {
1631    /// A frontier has been previously reported.
1632    Reported(Antichain<Timestamp>),
1633    /// No frontier has been reported yet.
1634    NotReported {
1635        /// A lower bound for frontiers that may be reported in the future.
1636        lower: Antichain<Timestamp>,
1637    },
1638}
1639
1640impl ReportedFrontier {
1641    /// Create a new `ReportedFrontier` enforcing the minimum lower bound.
1642    pub fn new() -> Self {
1643        let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1644        Self::NotReported { lower }
1645    }
1646
1647    /// Whether the reported frontier is the empty frontier.
1648    pub fn is_empty(&self) -> bool {
1649        match self {
1650            Self::Reported(frontier) => frontier.is_empty(),
1651            Self::NotReported { .. } => false,
1652        }
1653    }
1654
1655    /// Whether this `ReportedFrontier` allows reporting the given frontier.
1656    ///
1657    /// A `ReportedFrontier` allows reporting of another frontier if:
1658    ///  * The other frontier is greater than the reported frontier.
1659    ///  * The other frontier is greater than or equal to the lower bound.
1660    fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1661        match self {
1662            Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1663            Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1664        }
1665    }
1666}
1667
1668/// State maintained for a compute collection.
1669pub struct CollectionState {
1670    /// Tracks the frontiers that have been reported to the controller.
1671    reported_frontiers: ReportedFrontiers,
1672    /// The index of the dataflow computing this collection.
1673    ///
1674    /// Used for dropping the dataflow when the collection is dropped.
1675    /// The Dataflow index is wrapped in an `Rc`s and can be shared between collections, to reflect
1676    /// the possibility that a single dataflow can export multiple collections.
1677    dataflow_index: Rc<usize>,
1678    /// Whether this collection is a subscribe or copy-to.
1679    ///
1680    /// The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
1681    /// collections, so we need to be able to recognize them. This is something we would like to
1682    /// change in the future (database-issues#4701).
1683    pub is_subscribe_or_copy: bool,
1684    /// The collection's initial as-of frontier.
1685    ///
1686    /// Used to determine hydration status.
1687    as_of: Antichain<Timestamp>,
1688
1689    /// A token that should be dropped when this collection is dropped to clean up associated
1690    /// sink state.
1691    ///
1692    /// Only `Some` if the collection is a sink.
1693    pub sink_token: Option<SinkToken>,
1694    /// Frontier of sink writes.
1695    ///
1696    /// Only `Some` if the collection is a sink and *not* a subscribe.
1697    pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1698    /// Frontier probes for every input to the collection.
1699    pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1700    /// A probe reporting the frontier of times through which all collection outputs have been
1701    /// computed (but not necessarily written).
1702    ///
1703    /// `None` for collections with compute frontiers equal to their write frontiers.
1704    pub compute_probe: Option<probe::Handle<Timestamp>>,
1705    /// Logging state maintained for this collection.
1706    logging: Option<CollectionLogging>,
1707    /// Metrics tracked for this collection.
1708    metrics: CollectionMetrics,
1709}
1710
1711impl CollectionState {
1712    fn new(
1713        dataflow_index: Rc<usize>,
1714        is_subscribe_or_copy: bool,
1715        as_of: Antichain<Timestamp>,
1716        metrics: CollectionMetrics,
1717    ) -> Self {
1718        Self {
1719            reported_frontiers: ReportedFrontiers::new(),
1720            dataflow_index,
1721            is_subscribe_or_copy,
1722            as_of,
1723            sink_token: None,
1724            sink_write_frontier: None,
1725            input_probes: Default::default(),
1726            compute_probe: None,
1727            logging: None,
1728            metrics,
1729        }
1730    }
1731
1732    /// Return the frontiers that have been reported to the controller.
1733    fn reported_frontiers(&self) -> &ReportedFrontiers {
1734        &self.reported_frontiers
1735    }
1736
1737    /// Reset all reported frontiers to the given value.
1738    pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1739        self.reported_frontiers.write_frontier = frontier.clone();
1740        self.reported_frontiers.input_frontier = frontier.clone();
1741        self.reported_frontiers.output_frontier = frontier;
1742    }
1743
1744    /// Set the write frontier that has been reported to the controller.
1745    fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1746        if let Some(logging) = &mut self.logging {
1747            let time = match &frontier {
1748                ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1749                ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1750            };
1751            logging.set_frontier(time);
1752        }
1753
1754        self.reported_frontiers.write_frontier = frontier;
1755    }
1756
1757    /// Set the input frontier that has been reported to the controller.
1758    fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1759        // Use this opportunity to update our input frontier logging.
1760        if let Some(logging) = &mut self.logging {
1761            for (id, probe) in &self.input_probes {
1762                let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1763                logging.set_import_frontier(*id, new_time);
1764            }
1765        }
1766
1767        self.reported_frontiers.input_frontier = frontier;
1768    }
1769
1770    /// Set the output frontier that has been reported to the controller.
1771    fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1772        let already_hydrated = self.hydrated();
1773
1774        self.reported_frontiers.output_frontier = frontier;
1775
1776        if !already_hydrated && self.hydrated() {
1777            if let Some(logging) = &mut self.logging {
1778                logging.set_hydrated();
1779            }
1780            self.metrics.record_collection_hydrated();
1781        }
1782    }
1783
1784    /// Return whether this collection is hydrated.
1785    fn hydrated(&self) -> bool {
1786        match &self.reported_frontiers.output_frontier {
1787            ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1788            ReportedFrontier::NotReported { .. } => false,
1789        }
1790    }
1791}
1792
1793/// State remembered about a dropped compute collection.
1794///
1795/// This is the subset of the full [`CollectionState`] that survives the invocation of
1796/// `drop_collection`, until it is finally dropped in `report_dropped_collections`. It includes any
1797/// information required to report the dropping of a collection to the controller.
1798///
1799/// Note that this state must _not_ store any state (such as tokens) whose dropping releases
1800/// resources elsewhere in the system. A `DroppedCollection` for a collection dropped during
1801/// reconciliation might be alive at the same time as the [`CollectionState`] for the re-created
1802/// collection, and if the dropped collection hasn't released all its held resources by the time
1803/// the new one is created, conflicts can ensue.
1804pub struct DroppedCollection {
1805    reported_frontiers: ReportedFrontiers,
1806    is_subscribe_or_copy: bool,
1807}