mz_compute/
compute_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for compute timely instances.
7
8use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use bytesize::ByteSize;
19use differential_dataflow::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32    ENABLE_ACTIVE_DATAFLOW_CANCELATION, ENABLE_PEEK_RESPONSE_STASH,
33    PEEK_RESPONSE_STASH_BATCH_MAX_RUNS, PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE,
34    PEEK_STASH_NUM_BATCHES,
35};
36use mz_compute_types::plan::render_plan::RenderPlan;
37use mz_dyncfg::ConfigSet;
38use mz_expr::SafeMfpPlan;
39use mz_expr::row::RowCollection;
40use mz_ore::cast::CastFrom;
41use mz_ore::collections::CollectionExt;
42use mz_ore::metrics::UIntGauge;
43use mz_ore::now::EpochMillis;
44use mz_ore::task::AbortOnDropHandle;
45use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
46use mz_persist_client::Diagnostics;
47use mz_persist_client::cache::PersistClientCache;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_types::PersistLocation;
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::fixed_length::ToDatumIter;
53use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
54use mz_storage_operators::stats::StatsCursor;
55use mz_storage_types::StorageDiff;
56use mz_storage_types::controller::CollectionMetadata;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::sources::SourceData;
59use mz_storage_types::time_dependence::TimeDependence;
60use mz_txn_wal::operator::TxnsContext;
61use mz_txn_wal::txn_cache::TxnsCache;
62use timely::communication::Allocate;
63use timely::dataflow::operators::probe;
64use timely::order::PartialOrder;
65use timely::progress::frontier::Antichain;
66use timely::scheduling::Scheduler;
67use timely::worker::Worker as TimelyWorker;
68use tokio::sync::{oneshot, watch};
69use tracing::{Level, debug, error, info, span, warn};
70use uuid::Uuid;
71
72use crate::arrangement::manager::{TraceBundle, TraceManager};
73use crate::logging;
74use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
75use crate::logging::initialize::LoggingTraces;
76use crate::metrics::{CollectionMetrics, WorkerMetrics};
77use crate::render::{LinearJoinSpec, StartSignal};
78use crate::server::{ComputeInstanceContext, ResponseSender};
79
80mod peek_result_iterator;
81mod peek_stash;
82
83/// Worker-local state that is maintained across dataflows.
84///
85/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
86/// done between data ingress and egress.
87pub struct ComputeState {
88    /// State kept for each installed compute collection.
89    ///
90    /// Each collection has exactly one frontier.
91    /// How the frontier is communicated depends on the collection type:
92    ///  * Frontiers of indexes are equal to the frontier of their corresponding traces in the
93    ///    `TraceManager`.
94    ///  * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
95    ///  * Subscribes report their frontiers through the `subscribe_response_buffer`.
96    pub collections: BTreeMap<GlobalId, CollectionState>,
97    /// Collections that were recently dropped and whose removal needs to be reported.
98    pub dropped_collections: Vec<(GlobalId, DroppedCollection)>,
99    /// The traces available for sharing across dataflows.
100    pub traces: TraceManager,
101    /// Shared buffer with SUBSCRIBE operator instances by which they can respond.
102    ///
103    /// The entries are pairs of sink identifier (to identify the subscribe instance)
104    /// and the response itself.
105    pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
106    /// Shared buffer with S3 oneshot operator instances by which they can respond.
107    ///
108    /// The entries are pairs of sink identifier (to identify the s3 oneshot instance)
109    /// and the response itself.
110    pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
111    /// Peek commands that are awaiting fulfillment.
112    pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
113    /// The persist location where we can stash large peek results.
114    pub peek_stash_persist_location: Option<PersistLocation>,
115    /// The logger, from Timely's logging framework, if logs are enabled.
116    pub compute_logger: Option<logging::compute::Logger>,
117    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
118    /// This is intentionally shared between workers.
119    pub persist_clients: Arc<PersistClientCache>,
120    /// Context necessary for rendering txn-wal operators.
121    pub txns_ctx: TxnsContext,
122    /// History of commands received by this workers and all its peers.
123    pub command_history: ComputeCommandHistory<UIntGauge>,
124    /// Max size in bytes of any result.
125    max_result_size: u64,
126    /// Specification for rendering linear joins.
127    pub linear_join_spec: LinearJoinSpec,
128    /// Metrics for this worker.
129    pub metrics: WorkerMetrics,
130    /// A process-global handle to tracing configuration.
131    tracing_handle: Arc<TracingHandle>,
132    /// Other configuration for compute
133    pub context: ComputeInstanceContext,
134    /// Per-worker dynamic configuration.
135    ///
136    /// This is separate from the process-global `ConfigSet` and contains config options that need
137    /// to be applied consistently with compute command order.
138    ///
139    /// For example, for options that influence dataflow rendering it is important that all workers
140    /// render the same dataflow with the same options. If these options were stored in a global
141    /// `ConfigSet`, we couldn't guarantee that all workers observe changes to them at the same
142    /// point in the stream of compute commands. Storing per-worker configuration ensures that
143    /// because each worker's configuration is only updated once that worker observes the
144    /// respective `UpdateConfiguration` command.
145    ///
146    /// Reference-counted to avoid cloning for `Context`.
147    pub worker_config: Rc<ConfigSet>,
148
149    /// Collections awaiting schedule instruction by the controller.
150    ///
151    /// Each entry stores a reference to a token that can be dropped to unsuspend the collection's
152    /// dataflow. Multiple collections can reference the same token if they are exported by the
153    /// same dataflow.
154    suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
155
156    /// When this replica/cluster is in read-only mode it must not affect any
157    /// changes to external state. This flag can only be changed by a
158    /// [ComputeCommand::AllowWrites].
159    ///
160    /// Everything running on this replica/cluster must obey this flag. At the
161    /// time of writing the only part that is doing this is `persist_sink`.
162    ///
163    /// NOTE: In the future, we might want a more complicated flag, for example
164    /// something that tells us after which timestamp we are allowed to write.
165    /// In this first version we are keeping things as simple as possible!
166    pub read_only_rx: watch::Receiver<bool>,
167
168    /// Send-side for read-only state.
169    pub read_only_tx: watch::Sender<bool>,
170
171    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
172    /// perform maintenance with every `step_or_park` invocation.
173    pub server_maintenance_interval: Duration,
174
175    /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started.
176    ///
177    /// Used to compute `replica_expiration`.
178    pub init_system_time: EpochMillis,
179
180    /// The maximum time for which the replica is expected to live. If not empty, dataflows in the
181    /// replica can drop diffs associated with timestamps beyond the replica expiration.
182    /// The replica will panic if such dataflows are not dropped before the replica has expired.
183    pub replica_expiration: Antichain<Timestamp>,
184}
185
186impl ComputeState {
187    /// Construct a new `ComputeState`.
188    pub fn new(
189        persist_clients: Arc<PersistClientCache>,
190        txns_ctx: TxnsContext,
191        metrics: WorkerMetrics,
192        tracing_handle: Arc<TracingHandle>,
193        context: ComputeInstanceContext,
194    ) -> Self {
195        let traces = TraceManager::new(metrics.clone());
196        let command_history = ComputeCommandHistory::new(metrics.for_history());
197
198        // We always initialize as read_only=true. Only when we're explicitly
199        // allowed do we switch to doing writes.
200        let (read_only_tx, read_only_rx) = watch::channel(true);
201
202        Self {
203            collections: Default::default(),
204            dropped_collections: Default::default(),
205            traces,
206            subscribe_response_buffer: Default::default(),
207            copy_to_response_buffer: Default::default(),
208            pending_peeks: Default::default(),
209            peek_stash_persist_location: None,
210            compute_logger: None,
211            persist_clients,
212            txns_ctx,
213            command_history,
214            max_result_size: u64::MAX,
215            linear_join_spec: Default::default(),
216            metrics,
217            tracing_handle,
218            context,
219            worker_config: mz_dyncfgs::all_dyncfgs().into(),
220            suspended_collections: Default::default(),
221            read_only_tx,
222            read_only_rx,
223            server_maintenance_interval: Duration::ZERO,
224            init_system_time: mz_ore::now::SYSTEM_TIME(),
225            replica_expiration: Antichain::default(),
226        }
227    }
228
229    /// Return a mutable reference to the identified collection.
230    ///
231    /// Panics if the collection doesn't exist.
232    pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
233        self.collections
234            .get_mut(&id)
235            .expect("collection must exist")
236    }
237
238    /// Construct a new frontier probe for the given input and add it to the state of the given
239    /// collections.
240    ///
241    /// The caller is responsible for attaching the returned probe handle to the respective
242    /// dataflow input stream.
243    pub fn input_probe_for(
244        &mut self,
245        input_id: GlobalId,
246        collection_ids: impl Iterator<Item = GlobalId>,
247    ) -> probe::Handle<Timestamp> {
248        let probe = probe::Handle::default();
249        for id in collection_ids {
250            if let Some(collection) = self.collections.get_mut(&id) {
251                collection.input_probes.insert(input_id, probe.clone());
252            }
253        }
254        probe
255    }
256
257    /// Apply the current `worker_config` to the compute state.
258    fn apply_worker_config(&mut self) {
259        use mz_compute_types::dyncfgs::*;
260
261        let config = &self.worker_config;
262
263        self.linear_join_spec = LinearJoinSpec::from_config(config);
264
265        if ENABLE_LGALLOC.get(config) {
266            if let Some(path) = &self.context.scratch_directory {
267                let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
268                let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
269                let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
270                let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
271                let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
272                info!(
273                    ?path,
274                    backgrund_interval=?interval,
275                    clear_bytes,
276                    eager_return,
277                    file_growth_dampener,
278                    local_buffer_bytes,
279                    "enabling lgalloc"
280                );
281                let background_worker_config = lgalloc::BackgroundWorkerConfig {
282                    interval,
283                    clear_bytes,
284                };
285                lgalloc::lgalloc_set_config(
286                    lgalloc::LgAlloc::new()
287                        .enable()
288                        .with_path(path.clone())
289                        .with_background_config(background_worker_config)
290                        .eager_return(eager_return)
291                        .file_growth_dampener(file_growth_dampener)
292                        .local_buffer_bytes(local_buffer_bytes),
293                );
294            } else {
295                debug!("not enabling lgalloc, scratch directory not specified");
296            }
297        } else {
298            info!("disabling lgalloc");
299            lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
300        }
301
302        crate::memory_limiter::apply_limiter_config(config);
303
304        mz_ore::region::ENABLE_LGALLOC_REGION.store(
305            ENABLE_COLUMNATION_LGALLOC.get(config),
306            std::sync::atomic::Ordering::Relaxed,
307        );
308
309        let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
310        mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
311
312        // Remember the maintenance interval locally to avoid reading it from the config set on
313        // every server iteration.
314        self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
315
316        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
317        match overflowing_behavior.parse() {
318            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
319            Err(err) => {
320                error!(
321                    err,
322                    overflowing_behavior, "Invalid value for ore_overflowing_behavior"
323                );
324            }
325        }
326    }
327
328    /// Apply the provided replica expiration `offset` by converting it to a frontier relative to
329    /// the replica's initialization system time.
330    ///
331    /// Only expected to be called once when creating the instance. Guards against calling it
332    /// multiple times by checking if the local expiration time is set.
333    pub fn apply_expiration_offset(&mut self, offset: Duration) {
334        if self.replica_expiration.is_empty() {
335            let offset: EpochMillis = offset
336                .as_millis()
337                .try_into()
338                .expect("duration must fit within u64");
339            let replica_expiration_millis = self.init_system_time + offset;
340            let replica_expiration = Timestamp::from(replica_expiration_millis);
341
342            info!(
343                offset = %offset,
344                replica_expiration_millis = %replica_expiration_millis,
345                replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
346                "setting replica expiration",
347            );
348            self.replica_expiration = Antichain::from_elem(replica_expiration);
349
350            // Record the replica expiration in the metrics.
351            self.metrics
352                .replica_expiration_timestamp_seconds
353                .set(replica_expiration.into());
354        }
355    }
356
357    /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
358    /// appropriate to this replica.
359    pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
360        use mz_compute_types::dyncfgs::{
361            DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
362        };
363
364        if self.persist_clients.cfg.is_cc_active {
365            DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
366        } else {
367            DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
368        }
369    }
370}
371
372/// A wrapper around [ComputeState] with a live timely worker and response channel.
373pub(crate) struct ActiveComputeState<'a, A: Allocate> {
374    /// The underlying Timely worker.
375    pub timely_worker: &'a mut TimelyWorker<A>,
376    /// The compute state itself.
377    pub compute_state: &'a mut ComputeState,
378    /// The channel over which frontier information is reported.
379    pub response_tx: &'a mut ResponseSender,
380}
381
382/// A token that keeps a sink alive.
383pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
384
385impl SinkToken {
386    /// Create a new `SinkToken`.
387    pub fn new(t: Box<dyn Any>) -> Self {
388        Self(t)
389    }
390}
391
392impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
393    /// Entrypoint for applying a compute command.
394    #[mz_ore::instrument(level = "debug")]
395    pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
396        use ComputeCommand::*;
397
398        self.compute_state.command_history.push(cmd.clone());
399
400        // Record the command duration, per worker and command kind.
401        let timer = self
402            .compute_state
403            .metrics
404            .handle_command_duration_seconds
405            .for_command(&cmd)
406            .start_timer();
407
408        match cmd {
409            Hello { .. } => panic!("Hello must be captured before"),
410            CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
411            InitializationComplete => (),
412            UpdateConfiguration(params) => self.handle_update_configuration(*params),
413            CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
414            Schedule(id) => self.handle_schedule(id),
415            AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
416            Peek(peek) => {
417                peek.otel_ctx.attach_as_parent();
418                self.handle_peek(*peek)
419            }
420            CancelPeek { uuid } => self.handle_cancel_peek(uuid),
421            AllowWrites => {
422                self.compute_state
423                    .read_only_tx
424                    .send(false)
425                    .expect("we're holding one other end");
426                self.compute_state.persist_clients.cfg().enable_compaction();
427            }
428        }
429
430        timer.observe_duration();
431    }
432
433    fn handle_create_instance(&mut self, config: InstanceConfig) {
434        // Ensure the state is consistent with the config before we initialize anything.
435        self.compute_state.apply_worker_config();
436        if let Some(offset) = config.expiration_offset {
437            self.compute_state.apply_expiration_offset(offset);
438        }
439
440        self.initialize_logging(config.logging);
441
442        self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
443    }
444
445    fn handle_update_configuration(&mut self, params: ComputeParameters) {
446        info!("Applying configuration update: {params:?}");
447
448        let ComputeParameters {
449            workload_class,
450            max_result_size,
451            tracing,
452            grpc_client: _grpc_client,
453            dyncfg_updates,
454        } = params;
455
456        if let Some(v) = workload_class {
457            self.compute_state.metrics.set_workload_class(v);
458        }
459        if let Some(v) = max_result_size {
460            self.compute_state.max_result_size = v;
461        }
462
463        tracing.apply(self.compute_state.tracing_handle.as_ref());
464
465        dyncfg_updates.apply(&self.compute_state.worker_config);
466        self.compute_state
467            .persist_clients
468            .cfg()
469            .apply_from(&dyncfg_updates);
470
471        // Note: We're only updating mz_metrics from the compute state here, but not from the
472        // equivalent storage state. This is because they're running on the same process and
473        // share the metrics.
474        mz_metrics::update_dyncfg(&dyncfg_updates);
475
476        self.compute_state.apply_worker_config();
477    }
478
479    fn handle_create_dataflow(
480        &mut self,
481        dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
482    ) {
483        let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
484        let as_of = dataflow.as_of.clone().unwrap();
485
486        let dataflow_expiration = dataflow
487            .time_dependence
488            .as_ref()
489            .map(|time_dependence| {
490                self.determine_dataflow_expiration(time_dependence, &dataflow.until)
491            })
492            .unwrap_or_default();
493
494        // Add the dataflow expiration to `until`.
495        let until = dataflow.until.meet(&dataflow_expiration);
496
497        if dataflow.is_transient() {
498            debug!(
499                name = %dataflow.debug_name,
500                import_ids = %dataflow.display_import_ids(),
501                export_ids = %dataflow.display_export_ids(),
502                as_of = ?as_of.elements(),
503                time_dependence = ?dataflow.time_dependence,
504                expiration = ?dataflow_expiration.elements(),
505                expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
506                plan_until = ?dataflow.until.elements(),
507                until = ?until.elements(),
508                "creating dataflow",
509            );
510        } else {
511            info!(
512                name = %dataflow.debug_name,
513                import_ids = %dataflow.display_import_ids(),
514                export_ids = %dataflow.display_export_ids(),
515                as_of = ?as_of.elements(),
516                time_dependence = ?dataflow.time_dependence,
517                expiration = ?dataflow_expiration.elements(),
518                expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
519                plan_until = ?dataflow.until.elements(),
520                until = ?until.elements(),
521                "creating dataflow",
522            );
523        };
524
525        let subscribe_copy_ids: BTreeSet<_> = dataflow
526            .subscribe_ids()
527            .chain(dataflow.copy_to_ids())
528            .collect();
529
530        // Initialize compute and logging state for each object.
531        for object_id in dataflow.export_ids() {
532            let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
533            let metrics = self.compute_state.metrics.for_collection(object_id);
534            let mut collection = CollectionState::new(
535                Rc::clone(&dataflow_index),
536                is_subscribe_or_copy,
537                as_of.clone(),
538                metrics,
539            );
540
541            if let Some(logger) = self.compute_state.compute_logger.clone() {
542                let logging = CollectionLogging::new(
543                    object_id,
544                    logger,
545                    *dataflow_index,
546                    dataflow.import_ids(),
547                );
548                collection.logging = Some(logging);
549            }
550
551            collection.reset_reported_frontiers(ReportedFrontier::NotReported {
552                lower: as_of.clone(),
553            });
554
555            let existing = self.compute_state.collections.insert(object_id, collection);
556            if existing.is_some() {
557                error!(
558                    id = ?object_id,
559                    "existing collection for newly created dataflow",
560                );
561            }
562        }
563
564        let (start_signal, suspension_token) = StartSignal::new();
565        for id in dataflow.export_ids() {
566            self.compute_state
567                .suspended_collections
568                .insert(id, Rc::clone(&suspension_token));
569        }
570
571        crate::render::build_compute_dataflow(
572            self.timely_worker,
573            self.compute_state,
574            dataflow,
575            start_signal,
576            until,
577            dataflow_expiration,
578        );
579    }
580
581    fn handle_schedule(&mut self, id: GlobalId) {
582        // A `Schedule` command instructs us to begin dataflow computation for a collection, so
583        // we should unsuspend it by dropping the corresponding suspension token. Note that a
584        // dataflow can export multiple collections and they all share one suspension token, so the
585        // computation of a dataflow will only start once all its exported collections have been
586        // scheduled.
587        let suspension_token = self.compute_state.suspended_collections.remove(&id);
588        drop(suspension_token);
589    }
590
591    fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
592        if frontier.is_empty() {
593            // Indicates that we may drop `id`, as there are no more valid times to read.
594            self.drop_collection(id);
595        } else {
596            self.compute_state
597                .traces
598                .allow_compaction(id, frontier.borrow());
599        }
600    }
601
602    #[mz_ore::instrument(level = "debug")]
603    fn handle_peek(&mut self, peek: Peek) {
604        let pending = match &peek.target {
605            PeekTarget::Index { id } => {
606                // Acquire a copy of the trace suitable for fulfilling the peek.
607                let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
608                PendingPeek::index(peek, trace_bundle)
609            }
610            PeekTarget::Persist { metadata, .. } => {
611                let metadata = metadata.clone();
612                PendingPeek::persist(
613                    peek,
614                    Arc::clone(&self.compute_state.persist_clients),
615                    metadata,
616                    usize::cast_from(self.compute_state.max_result_size),
617                    self.timely_worker,
618                )
619            }
620        };
621
622        // Log the receipt of the peek.
623        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
624            logger.log(&pending.as_log_event(true));
625        }
626
627        self.process_peek(&mut Antichain::new(), pending);
628    }
629
630    fn handle_cancel_peek(&mut self, uuid: Uuid) {
631        if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
632            self.send_peek_response(peek, PeekResponse::Canceled);
633        }
634    }
635
636    /// Arrange for the given collection to be dropped.
637    ///
638    /// Collection dropping occurs in three phases:
639    ///
640    ///  1. This method removes the collection from the [`ComputeState`] and drops its
641    ///     [`CollectionState`], including its held dataflow tokens. It then adds the dropped
642    ///     collection to `dropped_collections`.
643    ///  2. The next step of the Timely worker lets the source operators observe the token drops
644    ///     and shut themselves down.
645    ///  3. `report_dropped_collections` removes the entry from `dropped_collections` and emits any
646    ///     outstanding final responses required by the compute protocol.
647    ///
648    /// These steps ensure that we don't report a collection as dropped to the controller before it
649    /// has stopped reading from its inputs. Doing so would allow the controller to release its
650    /// read holds on the inputs, which could lead to panics from the replica trying to read
651    /// already compacted times.
652    fn drop_collection(&mut self, id: GlobalId) {
653        let collection = self
654            .compute_state
655            .collections
656            .remove(&id)
657            .expect("dropped untracked collection");
658
659        // If this collection is an index, remove its trace.
660        self.compute_state.traces.remove(&id);
661        // If the collection is unscheduled, remove it from the list of waiting collections.
662        self.compute_state.suspended_collections.remove(&id);
663
664        if ENABLE_ACTIVE_DATAFLOW_CANCELATION.get(&self.compute_state.worker_config) {
665            // Drop the dataflow, if all its exports have been dropped.
666            if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
667                self.timely_worker.drop_dataflow(index);
668            }
669        }
670
671        // Remember the collection as dropped, for emission of outstanding final compute responses.
672        let dropped = DroppedCollection {
673            reported_frontiers: collection.reported_frontiers,
674            is_subscribe_or_copy: collection.is_subscribe_or_copy,
675        };
676        self.compute_state.dropped_collections.push((id, dropped));
677    }
678
679    /// Initializes timely dataflow logging and publishes as a view.
680    pub fn initialize_logging(&mut self, config: LoggingConfig) {
681        if self.compute_state.compute_logger.is_some() {
682            panic!("dataflow server has already initialized logging");
683        }
684
685        let LoggingTraces {
686            traces,
687            dataflow_index,
688            compute_logger: logger,
689        } = logging::initialize(self.timely_worker, &config);
690
691        let dataflow_index = Rc::new(dataflow_index);
692        let mut log_index_ids = config.index_logs;
693        for (log, trace) in traces {
694            // Install trace as maintained index.
695            let id = log_index_ids
696                .remove(&log)
697                .expect("`logging::initialize` does not invent logs");
698            self.compute_state.traces.set(id, trace);
699
700            // Initialize compute and logging state for the logging index.
701            let is_subscribe_or_copy = false;
702            let as_of = Antichain::from_elem(Timestamp::MIN);
703            let metrics = self.compute_state.metrics.for_collection(id);
704            let mut collection = CollectionState::new(
705                Rc::clone(&dataflow_index),
706                is_subscribe_or_copy,
707                as_of,
708                metrics,
709            );
710
711            let logging =
712                CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
713            collection.logging = Some(logging);
714
715            let existing = self.compute_state.collections.insert(id, collection);
716            if existing.is_some() {
717                error!(
718                    id = ?id,
719                    "existing collection for newly initialized logging export",
720                );
721            }
722        }
723
724        // Sanity check.
725        assert!(
726            log_index_ids.is_empty(),
727            "failed to create requested logging indexes: {log_index_ids:?}",
728        );
729
730        self.compute_state.compute_logger = Some(logger);
731    }
732
733    /// Send progress information to the controller.
734    pub fn report_frontiers(&mut self) {
735        let mut responses = Vec::new();
736
737        // Maintain a single allocation for `new_frontier` to avoid allocating on every iteration.
738        let mut new_frontier = Antichain::new();
739
740        for (&id, collection) in self.compute_state.collections.iter_mut() {
741            // The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
742            // collections (database-issues#4701).
743            if collection.is_subscribe_or_copy {
744                continue;
745            }
746
747            let reported = collection.reported_frontiers();
748
749            // Collect the write frontier and check for progress.
750            new_frontier.clear();
751            if let Some(traces) = self.compute_state.traces.get_mut(&id) {
752                assert!(
753                    collection.sink_write_frontier.is_none(),
754                    "collection {id} has multiple frontiers"
755                );
756                traces.oks_mut().read_upper(&mut new_frontier);
757            } else if let Some(frontier) = &collection.sink_write_frontier {
758                new_frontier.clone_from(&frontier.borrow());
759            } else {
760                error!(id = ?id, "collection without write frontier");
761                continue;
762            }
763            let new_write_frontier = reported
764                .write_frontier
765                .allows_reporting(&new_frontier)
766                .then(|| new_frontier.clone());
767
768            // Collect the output frontier and check for progress.
769            //
770            // By default, the output frontier equals the write frontier (which is still stored in
771            // `new_frontier`). If the collection provides a compute frontier, we construct the
772            // output frontier by taking the meet of write and compute frontier, to avoid:
773            //  * reporting progress through times we have not yet written
774            //  * reporting progress through times we have not yet fully processed, for
775            //    collections that jump their write frontiers into the future
776            if let Some(probe) = &collection.compute_probe {
777                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
778            }
779            let new_output_frontier = reported
780                .output_frontier
781                .allows_reporting(&new_frontier)
782                .then(|| new_frontier.clone());
783
784            // Collect the input frontier and check for progress.
785            new_frontier.clear();
786            for probe in collection.input_probes.values() {
787                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
788            }
789            let new_input_frontier = reported
790                .input_frontier
791                .allows_reporting(&new_frontier)
792                .then(|| new_frontier.clone());
793
794            if let Some(frontier) = &new_write_frontier {
795                collection
796                    .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
797            }
798            if let Some(frontier) = &new_input_frontier {
799                collection
800                    .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
801            }
802            if let Some(frontier) = &new_output_frontier {
803                collection
804                    .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
805            }
806
807            let response = FrontiersResponse {
808                write_frontier: new_write_frontier,
809                input_frontier: new_input_frontier,
810                output_frontier: new_output_frontier,
811            };
812            if response.has_updates() {
813                responses.push((id, response));
814            }
815        }
816
817        for (id, frontiers) in responses {
818            self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
819        }
820    }
821
822    /// Report dropped collections to the controller.
823    pub fn report_dropped_collections(&mut self) {
824        let dropped_collections = std::mem::take(&mut self.compute_state.dropped_collections);
825
826        for (id, collection) in dropped_collections {
827            // The compute protocol requires us to send a `Frontiers` response with empty frontiers
828            // when a collection was dropped, unless:
829            //  * The frontier was already reported as empty previously, or
830            //  * The collection is a subscribe or copy-to.
831
832            if collection.is_subscribe_or_copy {
833                continue;
834            }
835
836            let reported = collection.reported_frontiers;
837            let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
838            let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
839            let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
840
841            let frontiers = FrontiersResponse {
842                write_frontier,
843                input_frontier,
844                output_frontier,
845            };
846            if frontiers.has_updates() {
847                self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
848            }
849        }
850    }
851
852    /// Report per-worker metrics.
853    pub(crate) fn report_metrics(&self) {
854        if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
855            let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
856            let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
857            let remaining = expiration - now;
858            self.compute_state
859                .metrics
860                .replica_expiration_remaining_seconds
861                .set(remaining)
862        }
863    }
864
865    /// Either complete the peek (and send the response) or put it in the pending set.
866    fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
867        let response = match &mut peek {
868            PendingPeek::Index(peek) => {
869                let peek_stash_eligible = peek
870                    .peek
871                    .finishing
872                    .is_streamable(peek.peek.result_desc.arity());
873
874                let peek_stash_enabled = {
875                    let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
876                    let peek_persist_stash_available =
877                        self.compute_state.peek_stash_persist_location.is_some();
878                    if !peek_persist_stash_available && enabled {
879                        tracing::error!(
880                            "missing peek_stash_persist_location but peek stash is enabled"
881                        );
882                    }
883                    enabled && peek_persist_stash_available
884                };
885
886                let peek_stash_threshold_bytes =
887                    PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
888
889                match peek.seek_fulfillment(
890                    upper,
891                    self.compute_state.max_result_size,
892                    peek_stash_enabled && peek_stash_eligible,
893                    peek_stash_threshold_bytes,
894                ) {
895                    PeekStatus::Ready(result) => Some(result),
896                    PeekStatus::NotReady => None,
897                    PeekStatus::UsePeekStash => {
898                        let _span =
899                            span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
900
901                        let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
902                            .get(&self.compute_state.worker_config);
903
904                        let stash_task = peek_stash::StashingPeek::start_upload(
905                            Arc::clone(&self.compute_state.persist_clients),
906                            self.compute_state
907                                .peek_stash_persist_location
908                                .as_ref()
909                                .expect("verified above"),
910                            peek.peek.clone(),
911                            peek.trace_bundle.clone(),
912                            peek_stash_batch_max_runs,
913                        );
914
915                        self.compute_state
916                            .pending_peeks
917                            .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
918                        return;
919                    }
920                }
921            }
922            PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
923                self.compute_state
924                    .metrics
925                    .persist_peek_seconds
926                    .observe(duration.as_secs_f64());
927                result
928            }),
929            PendingPeek::Stash(stashing_peek) => {
930                let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
931                let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
932                stashing_peek.pump_rows(num_batches, batch_size);
933
934                if let Ok((response, duration)) = stashing_peek.result.try_recv() {
935                    self.compute_state
936                        .metrics
937                        .stashed_peek_seconds
938                        .observe(duration.as_secs_f64());
939                    tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
940
941                    Some(response)
942                } else {
943                    None
944                }
945            }
946        };
947
948        if let Some(response) = response {
949            let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
950            self.send_peek_response(peek, response)
951        } else {
952            let uuid = peek.peek().uuid;
953            self.compute_state.pending_peeks.insert(uuid, peek);
954        }
955    }
956
957    /// Scan pending peeks and attempt to retire each.
958    pub fn process_peeks(&mut self) {
959        let mut upper = Antichain::new();
960        let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
961        for (_uuid, peek) in pending_peeks {
962            self.process_peek(&mut upper, peek);
963        }
964    }
965
966    /// Sends a response for this peek's resolution to the coordinator.
967    ///
968    /// Note that this function takes ownership of the `PendingPeek`, which is
969    /// meant to prevent multiple responses to the same peek.
970    #[mz_ore::instrument(level = "debug")]
971    fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
972        let log_event = peek.as_log_event(false);
973        // Respond with the response.
974        self.send_compute_response(ComputeResponse::PeekResponse(
975            peek.peek().uuid,
976            response,
977            OpenTelemetryContext::obtain(),
978        ));
979
980        // Log responding to the peek request.
981        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
982            logger.log(&log_event);
983        }
984    }
985
986    /// Scan the shared subscribe response buffer, and forward results along.
987    pub fn process_subscribes(&mut self) {
988        let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
989        for (sink_id, mut response) in subscribe_responses.drain(..) {
990            // Update frontier logging for this subscribe.
991            if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
992                let new_frontier = match &response {
993                    SubscribeResponse::Batch(b) => b.upper.clone(),
994                    SubscribeResponse::DroppedAt(_) => Antichain::new(),
995                };
996
997                let reported = collection.reported_frontiers();
998                assert!(
999                    reported.write_frontier.allows_reporting(&new_frontier),
1000                    "subscribe write frontier regression: {:?} -> {:?}",
1001                    reported.write_frontier,
1002                    new_frontier,
1003                );
1004                assert!(
1005                    reported.input_frontier.allows_reporting(&new_frontier),
1006                    "subscribe input frontier regression: {:?} -> {:?}",
1007                    reported.input_frontier,
1008                    new_frontier,
1009                );
1010
1011                collection
1012                    .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1013                collection
1014                    .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1015                collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1016            } else {
1017                // Presumably tracking state for this subscribe was already dropped by
1018                // `drop_collection`. There is nothing left to do for logging.
1019            }
1020
1021            response
1022                .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1023            self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1024        }
1025    }
1026
1027    /// Scan the shared copy to response buffer, and forward results along.
1028    pub fn process_copy_tos(&self) {
1029        let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1030        for (sink_id, response) in responses.drain(..) {
1031            self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1032        }
1033    }
1034
1035    /// Send a response to the coordinator.
1036    fn send_compute_response(&self, response: ComputeResponse) {
1037        // Ignore send errors because the coordinator is free to ignore our
1038        // responses. This happens during shutdown.
1039        let _ = self.response_tx.send(response);
1040    }
1041
1042    /// Checks for dataflow expiration. Panics if we're past the replica expiration time.
1043    pub(crate) fn check_expiration(&self) {
1044        let now = mz_ore::now::SYSTEM_TIME();
1045        if self.compute_state.replica_expiration.less_than(&now.into()) {
1046            let now_datetime = mz_ore::now::to_datetime(now);
1047            let expiration_datetime = self
1048                .compute_state
1049                .replica_expiration
1050                .as_option()
1051                .map(Into::into)
1052                .map(mz_ore::now::to_datetime);
1053
1054            // We error and assert separately to produce structured logs in anything that depends
1055            // on tracing.
1056            error!(
1057                now,
1058                now_datetime = ?now_datetime,
1059                expiration = ?self.compute_state.replica_expiration.elements(),
1060                expiration_datetime = ?expiration_datetime,
1061                "replica expired"
1062            );
1063
1064            // Repeat condition for better error message.
1065            assert!(
1066                !self.compute_state.replica_expiration.less_than(&now.into()),
1067                "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1068                self.compute_state.replica_expiration.elements(),
1069            );
1070        }
1071    }
1072
1073    /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be
1074    /// dropped.
1075    ///
1076    /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under
1077    /// which dataflow expiration should be disabled.
1078    pub fn determine_dataflow_expiration(
1079        &self,
1080        time_dependence: &TimeDependence,
1081        until: &Antichain<mz_repr::Timestamp>,
1082    ) -> Antichain<mz_repr::Timestamp> {
1083        // Evaluate time dependence with respect to the expiration time.
1084        // * Step time forward to ensure the expiration time is different to the moment a dataflow
1085        //   can legitimately jump to.
1086        // * We cannot expire dataflow with an until that is less or equal to the expiration time.
1087        let iter = self
1088            .compute_state
1089            .replica_expiration
1090            .iter()
1091            .filter_map(|t| time_dependence.apply(*t))
1092            .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1093            .filter(|expiration| !until.less_equal(expiration));
1094        Antichain::from_iter(iter)
1095    }
1096}
1097
1098/// A peek against either an index or a Persist collection.
1099///
1100/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1101/// as each `PendingPeek` is meant to be dropped after it's responded to.
1102pub enum PendingPeek {
1103    /// A peek against an index. (Possibly a temporary index created for the purpose.)
1104    Index(IndexPeek),
1105    /// A peek against a Persist-backed collection.
1106    Persist(PersistPeek),
1107    /// A peek against an index that is being stashed in the peek stash by an
1108    /// async background task.
1109    Stash(peek_stash::StashingPeek),
1110}
1111
1112impl PendingPeek {
1113    /// Produces a corresponding log event.
1114    pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1115        let peek = self.peek();
1116        let (id, peek_type) = match &peek.target {
1117            PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1118            PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1119        };
1120        let uuid = peek.uuid.into_bytes();
1121        ComputeEvent::Peek(PeekEvent {
1122            id,
1123            time: peek.timestamp,
1124            uuid,
1125            peek_type,
1126            installed,
1127        })
1128    }
1129
1130    fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1131        let empty_frontier = Antichain::new();
1132        let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1133        trace_bundle
1134            .oks_mut()
1135            .set_logical_compaction(timestamp_frontier.borrow());
1136        trace_bundle
1137            .errs_mut()
1138            .set_logical_compaction(timestamp_frontier.borrow());
1139        trace_bundle
1140            .oks_mut()
1141            .set_physical_compaction(empty_frontier.borrow());
1142        trace_bundle
1143            .errs_mut()
1144            .set_physical_compaction(empty_frontier.borrow());
1145
1146        PendingPeek::Index(IndexPeek {
1147            peek,
1148            trace_bundle,
1149            span: tracing::Span::current(),
1150        })
1151    }
1152
1153    fn persist<A: Allocate>(
1154        peek: Peek,
1155        persist_clients: Arc<PersistClientCache>,
1156        metadata: CollectionMetadata,
1157        max_result_size: usize,
1158        timely_worker: &TimelyWorker<A>,
1159    ) -> Self {
1160        let active_worker = {
1161            // Choose the worker that does the actual peek arbitrarily but consistently.
1162            let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1163            chosen_index == timely_worker.index()
1164        };
1165        let activator = timely_worker.sync_activator_for([].into());
1166        let peek_uuid = peek.uuid;
1167
1168        let (result_tx, result_rx) = oneshot::channel();
1169        let timestamp = peek.timestamp;
1170        let mfp_plan = peek.map_filter_project.clone();
1171        let max_results_needed = peek
1172            .finishing
1173            .limit
1174            .map(|l| usize::cast_from(u64::from(l)))
1175            .unwrap_or(usize::MAX)
1176            + peek.finishing.offset;
1177        let order_by = peek.finishing.order_by.clone();
1178
1179        // Persist peeks can include at most one literal constraint.
1180        let literal_constraint = peek
1181            .literal_constraints
1182            .clone()
1183            .map(|rows| rows.into_element());
1184
1185        let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1186            let start = Instant::now();
1187            let result = if active_worker {
1188                PersistPeek::do_peek(
1189                    &persist_clients,
1190                    metadata,
1191                    timestamp,
1192                    literal_constraint,
1193                    mfp_plan,
1194                    max_result_size,
1195                    max_results_needed,
1196                )
1197                .await
1198            } else {
1199                Ok(vec![])
1200            };
1201            let result = match result {
1202                Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1203                Err(e) => PeekResponse::Error(e.to_string()),
1204            };
1205            match result_tx.send((result, start.elapsed())) {
1206                Ok(()) => {}
1207                Err((_result, elapsed)) => {
1208                    debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1209                }
1210            }
1211            match activator.activate() {
1212                Ok(()) => {}
1213                Err(_) => {
1214                    debug!("unable to wake timely after completed peek {peek_uuid}");
1215                }
1216            }
1217        });
1218        PendingPeek::Persist(PersistPeek {
1219            peek,
1220            _abort_handle: task_handle.abort_on_drop(),
1221            result: result_rx,
1222            span: tracing::Span::current(),
1223        })
1224    }
1225
1226    fn span(&self) -> &tracing::Span {
1227        match self {
1228            PendingPeek::Index(p) => &p.span,
1229            PendingPeek::Persist(p) => &p.span,
1230            PendingPeek::Stash(p) => &p.span,
1231        }
1232    }
1233
1234    pub(crate) fn peek(&self) -> &Peek {
1235        match self {
1236            PendingPeek::Index(p) => &p.peek,
1237            PendingPeek::Persist(p) => &p.peek,
1238            PendingPeek::Stash(p) => &p.peek,
1239        }
1240    }
1241}
1242
1243/// An in-progress Persist peek.
1244///
1245/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1246/// as each `PendingPeek` is meant to be dropped after it's responded to.
1247pub struct PersistPeek {
1248    pub(crate) peek: Peek,
1249    /// A background task that's responsible for producing the peek results.
1250    /// If we're no longer interested in the results, we abort the task.
1251    _abort_handle: AbortOnDropHandle<()>,
1252    /// The result of the background task, eventually.
1253    result: oneshot::Receiver<(PeekResponse, Duration)>,
1254    /// The `tracing::Span` tracking this peek's operation
1255    span: tracing::Span,
1256}
1257
1258impl PersistPeek {
1259    async fn do_peek(
1260        persist_clients: &PersistClientCache,
1261        metadata: CollectionMetadata,
1262        as_of: Timestamp,
1263        literal_constraint: Option<Row>,
1264        mfp_plan: SafeMfpPlan,
1265        max_result_size: usize,
1266        mut limit_remaining: usize,
1267    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1268        let client = persist_clients
1269            .open(metadata.persist_location)
1270            .await
1271            .map_err(|e| e.to_string())?;
1272
1273        let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1274            .open_leased_reader(
1275                metadata.data_shard,
1276                Arc::new(metadata.relation_desc.clone()),
1277                Arc::new(UnitSchema),
1278                Diagnostics::from_purpose("persist::peek"),
1279                USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1280            )
1281            .await
1282            .map_err(|e| e.to_string())?;
1283
1284        // If we are using txn-wal for this collection, then the upper might
1285        // be advanced lazily and we have to go through txn-wal for reads.
1286        //
1287        // TODO: If/when we have a process-wide TxnsRead worker for clusterd,
1288        // use in here (instead of opening a new TxnsCache) to save a persist
1289        // reader registration and some txns shard read traffic.
1290        let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1291            Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1292        } else {
1293            None
1294        };
1295
1296        let metrics = client.metrics();
1297
1298        let mut cursor = StatsCursor::new(
1299            &mut reader,
1300            txns_read.as_mut(),
1301            metrics,
1302            &mfp_plan,
1303            &metadata.relation_desc,
1304            Antichain::from_elem(as_of),
1305        )
1306        .await
1307        .map_err(|since| {
1308            format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1309        })?;
1310
1311        // Re-used state for processing and building rows.
1312        let mut result = vec![];
1313        let mut datum_vec = DatumVec::new();
1314        let mut row_builder = Row::default();
1315        let arena = RowArena::new();
1316        let mut total_size = 0usize;
1317
1318        let literal_len = match &literal_constraint {
1319            None => 0,
1320            Some(row) => row.iter().count(),
1321        };
1322
1323        'collect: while limit_remaining > 0 {
1324            let Some(batch) = cursor.next().await else {
1325                break;
1326            };
1327            for (data, _, d) in batch {
1328                let row = data.map_err(|e| e.to_string())?;
1329
1330                if let Some(literal) = &literal_constraint {
1331                    match row.iter().take(literal_len).cmp(literal.iter()) {
1332                        Ordering::Less => continue,
1333                        Ordering::Equal => {}
1334                        Ordering::Greater => break 'collect,
1335                    }
1336                }
1337
1338                let count: usize = d.try_into().map_err(|_| {
1339                    tracing::error!(
1340                        shard = %metadata.data_shard, diff = d, ?row,
1341                        "persist peek encountered negative multiplicities",
1342                    );
1343                    format!(
1344                        "Invalid data in source, \
1345                         saw retractions ({}) for row that does not exist: {:?}",
1346                        -d, row,
1347                    )
1348                })?;
1349                let Some(count) = NonZeroUsize::new(count) else {
1350                    continue;
1351                };
1352                let mut datum_local = datum_vec.borrow_with(&row);
1353                let eval_result = mfp_plan
1354                    .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1355                    .map(|row| row.cloned())
1356                    .map_err(|e| e.to_string())?;
1357                if let Some(row) = eval_result {
1358                    total_size = total_size
1359                        .saturating_add(row.byte_len())
1360                        .saturating_add(std::mem::size_of::<NonZeroUsize>());
1361                    if total_size > max_result_size {
1362                        return Err(format!(
1363                            "result exceeds max size of {}",
1364                            ByteSize::b(u64::cast_from(max_result_size))
1365                        ));
1366                    }
1367                    result.push((row, count));
1368                    limit_remaining = limit_remaining.saturating_sub(count.get());
1369                    if limit_remaining == 0 {
1370                        break;
1371                    }
1372                }
1373            }
1374        }
1375
1376        Ok(result)
1377    }
1378}
1379
1380/// An in-progress index-backed peek, and data to eventually fulfill it.
1381pub struct IndexPeek {
1382    peek: Peek,
1383    /// The data from which the trace derives.
1384    trace_bundle: TraceBundle,
1385    /// The `tracing::Span` tracking this peek's operation
1386    span: tracing::Span,
1387}
1388
1389impl IndexPeek {
1390    /// Attempts to fulfill the peek and reports success.
1391    ///
1392    /// To produce output at `peek.timestamp`, we must be certain that
1393    /// it is no longer changing. A trace guarantees that all future
1394    /// changes will be greater than or equal to an element of `upper`.
1395    ///
1396    /// If an element of `upper` is less or equal to `peek.timestamp`,
1397    /// then there can be further updates that would change the output.
1398    /// If no element of `upper` is less or equal to `peek.timestamp`,
1399    /// then for any time `t` less or equal to `peek.timestamp` it is
1400    /// not the case that `upper` is less or equal to that timestamp,
1401    /// and so the result cannot further evolve.
1402    fn seek_fulfillment(
1403        &mut self,
1404        upper: &mut Antichain<Timestamp>,
1405        max_result_size: u64,
1406        peek_stash_eligible: bool,
1407        peek_stash_threshold_bytes: usize,
1408    ) -> PeekStatus {
1409        self.trace_bundle.oks_mut().read_upper(upper);
1410        if upper.less_equal(&self.peek.timestamp) {
1411            return PeekStatus::NotReady;
1412        }
1413        self.trace_bundle.errs_mut().read_upper(upper);
1414        if upper.less_equal(&self.peek.timestamp) {
1415            return PeekStatus::NotReady;
1416        }
1417
1418        let read_frontier = self.trace_bundle.compaction_frontier();
1419        if !read_frontier.less_equal(&self.peek.timestamp) {
1420            let error = format!(
1421                "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1422                read_frontier.elements(),
1423                self.peek.timestamp,
1424            );
1425            return PeekStatus::Ready(PeekResponse::Error(error));
1426        }
1427
1428        self.collect_finished_data(
1429            max_result_size,
1430            peek_stash_eligible,
1431            peek_stash_threshold_bytes,
1432        )
1433    }
1434
1435    /// Collects data for a known-complete peek from the ok stream.
1436    fn collect_finished_data(
1437        &mut self,
1438        max_result_size: u64,
1439        peek_stash_eligible: bool,
1440        peek_stash_threshold_bytes: usize,
1441    ) -> PeekStatus {
1442        // Check if there exist any errors and, if so, return whatever one we
1443        // find first.
1444        let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1445        while cursor.key_valid(&storage) {
1446            let mut copies = Diff::ZERO;
1447            cursor.map_times(&storage, |time, diff| {
1448                if time.less_equal(&self.peek.timestamp) {
1449                    copies += diff;
1450                }
1451            });
1452            if copies.is_negative() {
1453                let error = cursor.key(&storage);
1454                tracing::error!(
1455                    target = %self.peek.target.id(), diff = %copies, %error,
1456                    "index peek encountered negative multiplicities in error trace",
1457                );
1458                return PeekStatus::Ready(PeekResponse::Error(format!(
1459                    "Invalid data in source errors, \
1460                    saw retractions ({}) for row that does not exist: {}",
1461                    -copies, error,
1462                )));
1463            }
1464            if copies.is_positive() {
1465                return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1466            }
1467            cursor.step_key(&storage);
1468        }
1469
1470        Self::collect_ok_finished_data(
1471            &self.peek,
1472            self.trace_bundle.oks_mut(),
1473            max_result_size,
1474            peek_stash_eligible,
1475            peek_stash_threshold_bytes,
1476        )
1477    }
1478
1479    /// Collects data for a known-complete peek from the ok stream.
1480    fn collect_ok_finished_data<Tr>(
1481        peek: &Peek<Timestamp>,
1482        oks_handle: &mut Tr,
1483        max_result_size: u64,
1484        peek_stash_eligible: bool,
1485        peek_stash_threshold_bytes: usize,
1486    ) -> PeekStatus
1487    where
1488        for<'a> Tr: TraceReader<
1489                Key<'a>: ToDatumIter + Eq,
1490                KeyOwn = Row,
1491                Val<'a>: ToDatumIter,
1492                TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1493                DiffGat<'a> = &'a Diff,
1494            >,
1495    {
1496        let max_result_size = usize::cast_from(max_result_size);
1497        let count_byte_size = size_of::<NonZeroUsize>();
1498
1499        // We clone `literal_constraints` here because we don't want to move the constraints
1500        // out of the peek struct, and don't want to modify in-place.
1501        let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1502            peek.target.id().clone(),
1503            peek.map_filter_project.clone(),
1504            peek.timestamp,
1505            peek.literal_constraints.clone().as_deref_mut(),
1506            oks_handle,
1507        );
1508
1509        // Accumulated `Vec<(row, count)>` results that we are likely to return.
1510        let mut results = Vec::new();
1511        let mut total_size: usize = 0;
1512
1513        // When set, a bound on the number of records we need to return.
1514        // The requirements on the records are driven by the finishing's
1515        // `order_by` field. Further limiting will happen when the results
1516        // are collected, so we don't need to have exactly this many results,
1517        // just at least those results that would have been returned.
1518        let max_results = peek.finishing.num_rows_needed();
1519
1520        let mut l_datum_vec = DatumVec::new();
1521        let mut r_datum_vec = DatumVec::new();
1522
1523        while let Some(row) = peek_iterator.next() {
1524            let row = match row {
1525                Ok(row) => row,
1526                Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1527            };
1528            let (row, copies) = row;
1529            let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1530
1531            total_size = total_size
1532                .saturating_add(row.byte_len())
1533                .saturating_add(count_byte_size);
1534            if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1535                return PeekStatus::UsePeekStash;
1536            }
1537            if total_size > max_result_size {
1538                return PeekStatus::Ready(PeekResponse::Error(format!(
1539                    "result exceeds max size of {}",
1540                    ByteSize::b(u64::cast_from(max_result_size))
1541                )));
1542            }
1543
1544            results.push((row, copies));
1545
1546            // If we hold many more than `max_results` records, we can thin down
1547            // `results` using `self.finishing.ordering`.
1548            if let Some(max_results) = max_results {
1549                // We use a threshold twice what we intend, to amortize the work
1550                // across all of the insertions. We could tighten this, but it
1551                // works for the moment.
1552                if results.len() >= 2 * max_results {
1553                    if peek.finishing.order_by.is_empty() {
1554                        results.truncate(max_results);
1555                        return PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1556                            results,
1557                            &peek.finishing.order_by,
1558                        )));
1559                    } else {
1560                        // We can sort `results` and then truncate to `max_results`.
1561                        // This has an effect similar to a priority queue, without
1562                        // its interactive dequeueing properties.
1563                        // TODO: Had we left these as `Vec<Datum>` we would avoid
1564                        // the unpacking; we should consider doing that, although
1565                        // it will require a re-pivot of the code to branch on this
1566                        // inner test (as we prefer not to maintain `Vec<Datum>`
1567                        // in the other case).
1568                        results.sort_by(|left, right| {
1569                            let left_datums = l_datum_vec.borrow_with(&left.0);
1570                            let right_datums = r_datum_vec.borrow_with(&right.0);
1571                            mz_expr::compare_columns(
1572                                &peek.finishing.order_by,
1573                                &left_datums,
1574                                &right_datums,
1575                                || left.0.cmp(&right.0),
1576                            )
1577                        });
1578                        let dropped = results.drain(max_results..);
1579                        let dropped_size =
1580                            dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1581                                acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1582                            });
1583                        total_size = total_size.saturating_sub(dropped_size);
1584                    }
1585                }
1586            }
1587        }
1588
1589        PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1590            results,
1591            &peek.finishing.order_by,
1592        )))
1593    }
1594}
1595
1596/// For keeping track of the state of pending or ready peeks, and managing
1597/// control flow.
1598enum PeekStatus {
1599    /// The frontiers of objects are not yet advanced enough, peek is still
1600    /// pending.
1601    NotReady,
1602    /// The result size is above the configured threshold and the peek is
1603    /// eligible for using the peek result stash.
1604    UsePeekStash,
1605    /// The peek result is ready.
1606    Ready(PeekResponse),
1607}
1608
1609/// The frontiers we have reported to the controller for a collection.
1610#[derive(Debug)]
1611struct ReportedFrontiers {
1612    /// The reported write frontier.
1613    write_frontier: ReportedFrontier,
1614    /// The reported input frontier.
1615    input_frontier: ReportedFrontier,
1616    /// The reported output frontier.
1617    output_frontier: ReportedFrontier,
1618}
1619
1620impl ReportedFrontiers {
1621    /// Creates a new `ReportedFrontiers` instance.
1622    fn new() -> Self {
1623        Self {
1624            write_frontier: ReportedFrontier::new(),
1625            input_frontier: ReportedFrontier::new(),
1626            output_frontier: ReportedFrontier::new(),
1627        }
1628    }
1629}
1630
1631/// A frontier we have reported to the controller, or the least frontier we are allowed to report.
1632#[derive(Clone, Debug)]
1633pub enum ReportedFrontier {
1634    /// A frontier has been previously reported.
1635    Reported(Antichain<Timestamp>),
1636    /// No frontier has been reported yet.
1637    NotReported {
1638        /// A lower bound for frontiers that may be reported in the future.
1639        lower: Antichain<Timestamp>,
1640    },
1641}
1642
1643impl ReportedFrontier {
1644    /// Create a new `ReportedFrontier` enforcing the minimum lower bound.
1645    pub fn new() -> Self {
1646        let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1647        Self::NotReported { lower }
1648    }
1649
1650    /// Whether the reported frontier is the empty frontier.
1651    pub fn is_empty(&self) -> bool {
1652        match self {
1653            Self::Reported(frontier) => frontier.is_empty(),
1654            Self::NotReported { .. } => false,
1655        }
1656    }
1657
1658    /// Whether this `ReportedFrontier` allows reporting the given frontier.
1659    ///
1660    /// A `ReportedFrontier` allows reporting of another frontier if:
1661    ///  * The other frontier is greater than the reported frontier.
1662    ///  * The other frontier is greater than or equal to the lower bound.
1663    fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1664        match self {
1665            Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1666            Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1667        }
1668    }
1669}
1670
1671/// State maintained for a compute collection.
1672pub struct CollectionState {
1673    /// Tracks the frontiers that have been reported to the controller.
1674    reported_frontiers: ReportedFrontiers,
1675    /// The index of the dataflow computing this collection.
1676    ///
1677    /// Used for dropping the dataflow when the collection is dropped.
1678    /// The Dataflow index is wrapped in an `Rc`s and can be shared between collections, to reflect
1679    /// the possibility that a single dataflow can export multiple collections.
1680    dataflow_index: Rc<usize>,
1681    /// Whether this collection is a subscribe or copy-to.
1682    ///
1683    /// The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
1684    /// collections, so we need to be able to recognize them. This is something we would like to
1685    /// change in the future (database-issues#4701).
1686    pub is_subscribe_or_copy: bool,
1687    /// The collection's initial as-of frontier.
1688    ///
1689    /// Used to determine hydration status.
1690    as_of: Antichain<Timestamp>,
1691
1692    /// A token that should be dropped when this collection is dropped to clean up associated
1693    /// sink state.
1694    ///
1695    /// Only `Some` if the collection is a sink.
1696    pub sink_token: Option<SinkToken>,
1697    /// Frontier of sink writes.
1698    ///
1699    /// Only `Some` if the collection is a sink and *not* a subscribe.
1700    pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1701    /// Frontier probes for every input to the collection.
1702    pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1703    /// A probe reporting the frontier of times through which all collection outputs have been
1704    /// computed (but not necessarily written).
1705    ///
1706    /// `None` for collections with compute frontiers equal to their write frontiers.
1707    pub compute_probe: Option<probe::Handle<Timestamp>>,
1708    /// Logging state maintained for this collection.
1709    logging: Option<CollectionLogging>,
1710    /// Metrics tracked for this collection.
1711    metrics: CollectionMetrics,
1712}
1713
1714impl CollectionState {
1715    fn new(
1716        dataflow_index: Rc<usize>,
1717        is_subscribe_or_copy: bool,
1718        as_of: Antichain<Timestamp>,
1719        metrics: CollectionMetrics,
1720    ) -> Self {
1721        Self {
1722            reported_frontiers: ReportedFrontiers::new(),
1723            dataflow_index,
1724            is_subscribe_or_copy,
1725            as_of,
1726            sink_token: None,
1727            sink_write_frontier: None,
1728            input_probes: Default::default(),
1729            compute_probe: None,
1730            logging: None,
1731            metrics,
1732        }
1733    }
1734
1735    /// Return the frontiers that have been reported to the controller.
1736    fn reported_frontiers(&self) -> &ReportedFrontiers {
1737        &self.reported_frontiers
1738    }
1739
1740    /// Reset all reported frontiers to the given value.
1741    pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1742        self.reported_frontiers.write_frontier = frontier.clone();
1743        self.reported_frontiers.input_frontier = frontier.clone();
1744        self.reported_frontiers.output_frontier = frontier;
1745    }
1746
1747    /// Set the write frontier that has been reported to the controller.
1748    fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1749        if let Some(logging) = &mut self.logging {
1750            let time = match &frontier {
1751                ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1752                ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1753            };
1754            logging.set_frontier(time);
1755        }
1756
1757        self.reported_frontiers.write_frontier = frontier;
1758    }
1759
1760    /// Set the input frontier that has been reported to the controller.
1761    fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1762        // Use this opportunity to update our input frontier logging.
1763        if let Some(logging) = &mut self.logging {
1764            for (id, probe) in &self.input_probes {
1765                let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1766                logging.set_import_frontier(*id, new_time);
1767            }
1768        }
1769
1770        self.reported_frontiers.input_frontier = frontier;
1771    }
1772
1773    /// Set the output frontier that has been reported to the controller.
1774    fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1775        let already_hydrated = self.hydrated();
1776
1777        self.reported_frontiers.output_frontier = frontier;
1778
1779        if !already_hydrated && self.hydrated() {
1780            if let Some(logging) = &mut self.logging {
1781                logging.set_hydrated();
1782            }
1783            self.metrics.record_collection_hydrated();
1784        }
1785    }
1786
1787    /// Return whether this collection is hydrated.
1788    fn hydrated(&self) -> bool {
1789        match &self.reported_frontiers.output_frontier {
1790            ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1791            ReportedFrontier::NotReported { .. } => false,
1792        }
1793    }
1794}
1795
1796/// State remembered about a dropped compute collection.
1797///
1798/// This is the subset of the full [`CollectionState`] that survives the invocation of
1799/// `drop_collection`, until it is finally dropped in `report_dropped_collections`. It includes any
1800/// information required to report the dropping of a collection to the controller.
1801///
1802/// Note that this state must _not_ store any state (such as tokens) whose dropping releases
1803/// resources elsewhere in the system. A `DroppedCollection` for a collection dropped during
1804/// reconciliation might be alive at the same time as the [`CollectionState`] for the re-created
1805/// collection, and if the dropped collection hasn't released all its held resources by the time
1806/// the new one is created, conflicts can ensue.
1807pub struct DroppedCollection {
1808    reported_frontiers: ReportedFrontiers,
1809    is_subscribe_or_copy: bool,
1810}