Skip to main content

mz_compute/
compute_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for compute timely instances.
7
8use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::{Cursor, TraceReader};
21use mz_compute_client::logging::LoggingConfig;
22use mz_compute_client::protocol::command::{
23    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
24};
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::{
27    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
28};
29use mz_compute_types::dataflows::DataflowDescription;
30use mz_compute_types::dyncfgs::{
31    ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
32    PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
33};
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::SafeMfpPlan;
37use mz_expr::row::RowCollection;
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::UIntGauge;
41use mz_ore::now::EpochMillis;
42use mz_ore::soft_panic_or_log;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, trace, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82/// Worker-local state that is maintained across dataflows.
83///
84/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
85/// done between data ingress and egress.
86pub struct ComputeState {
87    /// State kept for each installed compute collection.
88    ///
89    /// Each collection has exactly one frontier.
90    /// How the frontier is communicated depends on the collection type:
91    ///  * Frontiers of indexes are equal to the frontier of their corresponding traces in the
92    ///    `TraceManager`.
93    ///  * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
94    ///  * Subscribes report their frontiers through the `subscribe_response_buffer`.
95    pub collections: BTreeMap<GlobalId, CollectionState>,
96    /// The traces available for sharing across dataflows.
97    pub traces: TraceManager,
98    /// Shared buffer with SUBSCRIBE operator instances by which they can respond.
99    ///
100    /// The entries are pairs of sink identifier (to identify the subscribe instance)
101    /// and the response itself.
102    pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
103    /// Shared buffer with S3 oneshot operator instances by which they can respond.
104    ///
105    /// The entries are pairs of sink identifier (to identify the s3 oneshot instance)
106    /// and the response itself.
107    pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
108    /// Peek commands that are awaiting fulfillment.
109    pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
110    /// The persist location where we can stash large peek results.
111    pub peek_stash_persist_location: Option<PersistLocation>,
112    /// The logger, from Timely's logging framework, if logs are enabled.
113    pub compute_logger: Option<logging::compute::Logger>,
114    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
115    /// This is intentionally shared between workers.
116    pub persist_clients: Arc<PersistClientCache>,
117    /// Context necessary for rendering txn-wal operators.
118    pub txns_ctx: TxnsContext,
119    /// History of commands received by this workers and all its peers.
120    pub command_history: ComputeCommandHistory<UIntGauge>,
121    /// Max size in bytes of any result.
122    max_result_size: u64,
123    /// Specification for rendering linear joins.
124    pub linear_join_spec: LinearJoinSpec,
125    /// Metrics for this worker.
126    pub metrics: WorkerMetrics,
127    /// A process-global handle to tracing configuration.
128    tracing_handle: Arc<TracingHandle>,
129    /// Other configuration for compute
130    pub context: ComputeInstanceContext,
131    /// Per-worker dynamic configuration.
132    ///
133    /// This is separate from the process-global `ConfigSet` and contains config options that need
134    /// to be applied consistently with compute command order.
135    ///
136    /// For example, for options that influence dataflow rendering it is important that all workers
137    /// render the same dataflow with the same options. If these options were stored in a global
138    /// `ConfigSet`, we couldn't guarantee that all workers observe changes to them at the same
139    /// point in the stream of compute commands. Storing per-worker configuration ensures that
140    /// because each worker's configuration is only updated once that worker observes the
141    /// respective `UpdateConfiguration` command.
142    ///
143    /// Reference-counted to avoid cloning for `Context`.
144    pub worker_config: Rc<ConfigSet>,
145
146    /// Collections awaiting schedule instruction by the controller.
147    ///
148    /// Each entry stores a reference to a token that can be dropped to unsuspend the collection's
149    /// dataflow. Multiple collections can reference the same token if they are exported by the
150    /// same dataflow.
151    suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
152
153    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
154    /// perform maintenance with every `step_or_park` invocation.
155    pub server_maintenance_interval: Duration,
156
157    /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started.
158    ///
159    /// Used to compute `replica_expiration`.
160    pub init_system_time: EpochMillis,
161
162    /// The maximum time for which the replica is expected to live. If not empty, dataflows in the
163    /// replica can drop diffs associated with timestamps beyond the replica expiration.
164    /// The replica will panic if such dataflows are not dropped before the replica has expired.
165    pub replica_expiration: Antichain<Timestamp>,
166}
167
168impl ComputeState {
169    /// Construct a new `ComputeState`.
170    pub fn new(
171        persist_clients: Arc<PersistClientCache>,
172        txns_ctx: TxnsContext,
173        metrics: WorkerMetrics,
174        tracing_handle: Arc<TracingHandle>,
175        context: ComputeInstanceContext,
176    ) -> Self {
177        let traces = TraceManager::new(metrics.clone());
178        let command_history = ComputeCommandHistory::new(metrics.for_history());
179
180        Self {
181            collections: Default::default(),
182            traces,
183            subscribe_response_buffer: Default::default(),
184            copy_to_response_buffer: Default::default(),
185            pending_peeks: Default::default(),
186            peek_stash_persist_location: None,
187            compute_logger: None,
188            persist_clients,
189            txns_ctx,
190            command_history,
191            max_result_size: u64::MAX,
192            linear_join_spec: Default::default(),
193            metrics,
194            tracing_handle,
195            context,
196            worker_config: mz_dyncfgs::all_dyncfgs().into(),
197            suspended_collections: Default::default(),
198            server_maintenance_interval: Duration::ZERO,
199            init_system_time: mz_ore::now::SYSTEM_TIME(),
200            replica_expiration: Antichain::default(),
201        }
202    }
203
204    /// Return a mutable reference to the identified collection.
205    ///
206    /// Panics if the collection doesn't exist.
207    pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
208        self.collections
209            .get_mut(&id)
210            .expect("collection must exist")
211    }
212
213    /// Construct a new frontier probe for the given input and add it to the state of the given
214    /// collections.
215    ///
216    /// The caller is responsible for attaching the returned probe handle to the respective
217    /// dataflow input stream.
218    pub fn input_probe_for(
219        &mut self,
220        input_id: GlobalId,
221        collection_ids: impl Iterator<Item = GlobalId>,
222    ) -> probe::Handle<Timestamp> {
223        let probe = probe::Handle::default();
224        for id in collection_ids {
225            if let Some(collection) = self.collections.get_mut(&id) {
226                collection.input_probes.insert(input_id, probe.clone());
227            }
228        }
229        probe
230    }
231
232    /// Apply the current `worker_config` to the compute state.
233    fn apply_worker_config(&mut self) {
234        use mz_compute_types::dyncfgs::*;
235
236        let config = &self.worker_config;
237
238        self.linear_join_spec = LinearJoinSpec::from_config(config);
239
240        if ENABLE_LGALLOC.get(config) {
241            if let Some(path) = &self.context.scratch_directory {
242                let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
243                let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
244                let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
245                let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
246                let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
247                info!(
248                    ?path,
249                    backgrund_interval=?interval,
250                    clear_bytes,
251                    eager_return,
252                    file_growth_dampener,
253                    local_buffer_bytes,
254                    "enabling lgalloc"
255                );
256                let background_worker_config = lgalloc::BackgroundWorkerConfig {
257                    interval,
258                    clear_bytes,
259                };
260                lgalloc::lgalloc_set_config(
261                    lgalloc::LgAlloc::new()
262                        .enable()
263                        .with_path(path.clone())
264                        .with_background_config(background_worker_config)
265                        .eager_return(eager_return)
266                        .file_growth_dampener(file_growth_dampener)
267                        .local_buffer_bytes(local_buffer_bytes),
268                );
269            } else {
270                debug!("not enabling lgalloc, scratch directory not specified");
271            }
272        } else {
273            info!("disabling lgalloc");
274            lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
275        }
276
277        crate::memory_limiter::apply_limiter_config(config);
278
279        mz_ore::region::ENABLE_LGALLOC_REGION.store(
280            ENABLE_COLUMNATION_LGALLOC.get(config),
281            std::sync::atomic::Ordering::Relaxed,
282        );
283
284        let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
285        mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
286
287        // Remember the maintenance interval locally to avoid reading it from the config set on
288        // every server iteration.
289        self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
290
291        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
292        match overflowing_behavior.parse() {
293            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
294            Err(err) => {
295                error!(
296                    err,
297                    overflowing_behavior, "Invalid value for ore_overflowing_behavior"
298                );
299            }
300        }
301    }
302
303    /// Apply the provided replica expiration `offset` by converting it to a frontier relative to
304    /// the replica's initialization system time.
305    ///
306    /// Only expected to be called once when creating the instance. Guards against calling it
307    /// multiple times by checking if the local expiration time is set.
308    pub fn apply_expiration_offset(&mut self, offset: Duration) {
309        if self.replica_expiration.is_empty() {
310            let offset: EpochMillis = offset
311                .as_millis()
312                .try_into()
313                .expect("duration must fit within u64");
314            let replica_expiration_millis = self.init_system_time + offset;
315            let replica_expiration = Timestamp::from(replica_expiration_millis);
316
317            info!(
318                offset = %offset,
319                replica_expiration_millis = %replica_expiration_millis,
320                replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
321                "setting replica expiration",
322            );
323            self.replica_expiration = Antichain::from_elem(replica_expiration);
324
325            // Record the replica expiration in the metrics.
326            self.metrics
327                .replica_expiration_timestamp_seconds
328                .set(replica_expiration.into());
329        }
330    }
331
332    /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
333    /// appropriate to this replica.
334    pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
335        use mz_compute_types::dyncfgs::{
336            DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
337        };
338
339        if self.persist_clients.cfg.is_cc_active {
340            DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
341        } else {
342            DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
343        }
344    }
345}
346
347/// A wrapper around [ComputeState] with a live timely worker and response channel.
348pub(crate) struct ActiveComputeState<'a, A: Allocate> {
349    /// The underlying Timely worker.
350    pub timely_worker: &'a mut TimelyWorker<A>,
351    /// The compute state itself.
352    pub compute_state: &'a mut ComputeState,
353    /// The channel over which frontier information is reported.
354    pub response_tx: &'a mut ResponseSender,
355}
356
357/// A token that keeps a sink alive.
358pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
359
360impl SinkToken {
361    /// Create a new `SinkToken`.
362    pub fn new(t: Box<dyn Any>) -> Self {
363        Self(t)
364    }
365}
366
367impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
368    /// Entrypoint for applying a compute command.
369    #[mz_ore::instrument(level = "debug")]
370    pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
371        use ComputeCommand::*;
372
373        self.compute_state.command_history.push(cmd.clone());
374
375        // Record the command duration, per worker and command kind.
376        let timer = self
377            .compute_state
378            .metrics
379            .handle_command_duration_seconds
380            .for_command(&cmd)
381            .start_timer();
382
383        match cmd {
384            Hello { .. } => panic!("Hello must be captured before"),
385            CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
386            InitializationComplete => (),
387            UpdateConfiguration(params) => self.handle_update_configuration(*params),
388            CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
389            Schedule(id) => self.handle_schedule(id),
390            AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
391            Peek(peek) => {
392                peek.otel_ctx.attach_as_parent();
393                self.handle_peek(*peek)
394            }
395            CancelPeek { uuid } => self.handle_cancel_peek(uuid),
396            AllowWrites(id) => {
397                self.handle_allow_writes(id);
398            }
399        }
400
401        timer.observe_duration();
402    }
403
404    fn handle_create_instance(&mut self, config: InstanceConfig) {
405        // Ensure the state is consistent with the config before we initialize anything.
406        self.compute_state.apply_worker_config();
407        if let Some(offset) = config.expiration_offset {
408            self.compute_state.apply_expiration_offset(offset);
409        }
410
411        self.initialize_logging(config.logging);
412
413        self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
414    }
415
416    fn handle_update_configuration(&mut self, params: ComputeParameters) {
417        debug!("Applying configuration update: {params:?}");
418
419        let ComputeParameters {
420            workload_class,
421            max_result_size,
422            tracing,
423            grpc_client: _grpc_client,
424            dyncfg_updates,
425        } = params;
426
427        if let Some(v) = workload_class {
428            self.compute_state.metrics.set_workload_class(v);
429        }
430        if let Some(v) = max_result_size {
431            self.compute_state.max_result_size = v;
432        }
433
434        tracing.apply(self.compute_state.tracing_handle.as_ref());
435
436        dyncfg_updates.apply(&self.compute_state.worker_config);
437        self.compute_state
438            .persist_clients
439            .cfg()
440            .apply_from(&dyncfg_updates);
441
442        // Note: We're only updating mz_metrics from the compute state here, but not from the
443        // equivalent storage state. This is because they're running on the same process and
444        // share the metrics.
445        mz_metrics::update_dyncfg(&dyncfg_updates);
446
447        self.compute_state.apply_worker_config();
448    }
449
450    fn handle_create_dataflow(
451        &mut self,
452        dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
453    ) {
454        let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
455        let as_of = dataflow.as_of.clone().unwrap();
456
457        let dataflow_expiration = dataflow
458            .time_dependence
459            .as_ref()
460            .map(|time_dependence| {
461                self.determine_dataflow_expiration(time_dependence, &dataflow.until)
462            })
463            .unwrap_or_default();
464
465        // Add the dataflow expiration to `until`.
466        let until = dataflow.until.meet(&dataflow_expiration);
467
468        if dataflow.is_transient() {
469            debug!(
470                name = %dataflow.debug_name,
471                import_ids = %dataflow.display_import_ids(),
472                export_ids = %dataflow.display_export_ids(),
473                as_of = ?as_of.elements(),
474                time_dependence = ?dataflow.time_dependence,
475                expiration = ?dataflow_expiration.elements(),
476                expiration_datetime = ?dataflow_expiration
477                    .as_option()
478                    .map(|t| mz_ore::now::to_datetime(t.into())),
479                plan_until = ?dataflow.until.elements(),
480                until = ?until.elements(),
481                "creating dataflow",
482            );
483        } else {
484            info!(
485                name = %dataflow.debug_name,
486                import_ids = %dataflow.display_import_ids(),
487                export_ids = %dataflow.display_export_ids(),
488                as_of = ?as_of.elements(),
489                time_dependence = ?dataflow.time_dependence,
490                expiration = ?dataflow_expiration.elements(),
491                expiration_datetime = ?dataflow_expiration
492                    .as_option()
493                    .map(|t| mz_ore::now::to_datetime(t.into())),
494                plan_until = ?dataflow.until.elements(),
495                until = ?until.elements(),
496                "creating dataflow",
497            );
498        };
499
500        let subscribe_copy_ids: BTreeSet<_> = dataflow
501            .subscribe_ids()
502            .chain(dataflow.copy_to_ids())
503            .collect();
504
505        // Initialize compute and logging state for each object.
506        for object_id in dataflow.export_ids() {
507            let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
508            let metrics = self.compute_state.metrics.for_collection(object_id);
509            let mut collection = CollectionState::new(
510                Rc::clone(&dataflow_index),
511                is_subscribe_or_copy,
512                as_of.clone(),
513                metrics,
514            );
515
516            if let Some(logger) = self.compute_state.compute_logger.clone() {
517                let logging = CollectionLogging::new(
518                    object_id,
519                    logger,
520                    *dataflow_index,
521                    dataflow.import_ids(),
522                );
523                collection.logging = Some(logging);
524            }
525
526            collection.reset_reported_frontiers(ReportedFrontier::NotReported {
527                lower: as_of.clone(),
528            });
529
530            let existing = self.compute_state.collections.insert(object_id, collection);
531            if existing.is_some() {
532                error!(
533                    id = ?object_id,
534                    "existing collection for newly created dataflow",
535                );
536            }
537        }
538
539        let (start_signal, suspension_token) = StartSignal::new();
540        for id in dataflow.export_ids() {
541            self.compute_state
542                .suspended_collections
543                .insert(id, Rc::clone(&suspension_token));
544        }
545
546        crate::render::build_compute_dataflow(
547            self.timely_worker,
548            self.compute_state,
549            dataflow,
550            start_signal,
551            until,
552            dataflow_expiration,
553        );
554    }
555
556    fn handle_schedule(&mut self, id: GlobalId) {
557        // A `Schedule` command instructs us to begin dataflow computation for a collection, so
558        // we should unsuspend it by dropping the corresponding suspension token. Note that a
559        // dataflow can export multiple collections and they all share one suspension token, so the
560        // computation of a dataflow will only start once all its exported collections have been
561        // scheduled.
562        let suspension_token = self.compute_state.suspended_collections.remove(&id);
563        drop(suspension_token);
564    }
565
566    fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
567        if frontier.is_empty() {
568            // Indicates that we may drop `id`, as there are no more valid times to read.
569            self.drop_collection(id);
570        } else {
571            self.compute_state
572                .traces
573                .allow_compaction(id, frontier.borrow());
574        }
575    }
576
577    #[mz_ore::instrument(level = "debug")]
578    fn handle_peek(&mut self, peek: Peek) {
579        let pending = match &peek.target {
580            PeekTarget::Index { id } => {
581                // Acquire a copy of the trace suitable for fulfilling the peek.
582                let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
583                PendingPeek::index(peek, trace_bundle)
584            }
585            PeekTarget::Persist { metadata, .. } => {
586                let metadata = metadata.clone();
587                PendingPeek::persist(
588                    peek,
589                    Arc::clone(&self.compute_state.persist_clients),
590                    metadata,
591                    usize::cast_from(self.compute_state.max_result_size),
592                    self.timely_worker,
593                )
594            }
595        };
596
597        // Log the receipt of the peek.
598        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
599            logger.log(&pending.as_log_event(true));
600        }
601
602        self.process_peek(&mut Antichain::new(), pending);
603    }
604
605    fn handle_cancel_peek(&mut self, uuid: Uuid) {
606        if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
607            self.send_peek_response(peek, PeekResponse::Canceled);
608        }
609    }
610
611    fn handle_allow_writes(&mut self, id: GlobalId) {
612        // Enable persist compaction on any allow-writes command. We
613        // assume persist only compacts after making durable changes,
614        // such as appending a batch or advancing the upper.
615        self.compute_state.persist_clients.cfg().enable_compaction();
616
617        if let Some(collection) = self.compute_state.collections.get_mut(&id) {
618            collection.allow_writes();
619        } else {
620            soft_panic_or_log!("allow writes for unknown collection {id}");
621        }
622    }
623
624    /// Drop the given collection.
625    fn drop_collection(&mut self, id: GlobalId) {
626        let collection = self
627            .compute_state
628            .collections
629            .remove(&id)
630            .expect("dropped untracked collection");
631
632        // If this collection is an index, remove its trace.
633        self.compute_state.traces.remove(&id);
634        // If the collection is unscheduled, remove it from the list of waiting collections.
635        self.compute_state.suspended_collections.remove(&id);
636
637        // Drop the dataflow, if all its exports have been dropped.
638        if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
639            self.timely_worker.drop_dataflow(index);
640        }
641
642        // The compute protocol requires us to send a `Frontiers` response with empty frontiers
643        // when a collection was dropped, unless:
644        //  * The frontier was already reported as empty previously, or
645        //  * The collection is a subscribe or copy-to.
646        if !collection.is_subscribe_or_copy {
647            let reported = collection.reported_frontiers;
648            let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
649            let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
650            let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
651
652            let frontiers = FrontiersResponse {
653                write_frontier,
654                input_frontier,
655                output_frontier,
656            };
657            if frontiers.has_updates() {
658                self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
659            }
660        }
661    }
662
663    /// Initializes timely dataflow logging and publishes as a view.
664    pub fn initialize_logging(&mut self, config: LoggingConfig) {
665        if self.compute_state.compute_logger.is_some() {
666            panic!("dataflow server has already initialized logging");
667        }
668
669        let LoggingTraces {
670            traces,
671            dataflow_index,
672            compute_logger: logger,
673        } = logging::initialize(self.timely_worker, &config);
674
675        let dataflow_index = Rc::new(dataflow_index);
676        let mut log_index_ids = config.index_logs;
677        for (log, trace) in traces {
678            // Install trace as maintained index.
679            let id = log_index_ids
680                .remove(&log)
681                .expect("`logging::initialize` does not invent logs");
682            self.compute_state.traces.set(id, trace);
683
684            // Initialize compute and logging state for the logging index.
685            let is_subscribe_or_copy = false;
686            let as_of = Antichain::from_elem(Timestamp::MIN);
687            let metrics = self.compute_state.metrics.for_collection(id);
688            let mut collection = CollectionState::new(
689                Rc::clone(&dataflow_index),
690                is_subscribe_or_copy,
691                as_of,
692                metrics,
693            );
694
695            let logging =
696                CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
697            collection.logging = Some(logging);
698
699            let existing = self.compute_state.collections.insert(id, collection);
700            if existing.is_some() {
701                error!(
702                    id = ?id,
703                    "existing collection for newly initialized logging export",
704                );
705            }
706        }
707
708        // Sanity check.
709        assert!(
710            log_index_ids.is_empty(),
711            "failed to create requested logging indexes: {log_index_ids:?}",
712        );
713
714        self.compute_state.compute_logger = Some(logger);
715    }
716
717    /// Send progress information to the controller.
718    pub fn report_frontiers(&mut self) {
719        let mut responses = Vec::new();
720
721        // Maintain a single allocation for `new_frontier` to avoid allocating on every iteration.
722        let mut new_frontier = Antichain::new();
723
724        for (&id, collection) in self.compute_state.collections.iter_mut() {
725            // The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
726            // collections (database-issues#4701).
727            if collection.is_subscribe_or_copy {
728                continue;
729            }
730
731            let reported = collection.reported_frontiers();
732
733            // Collect the write frontier and check for progress.
734            new_frontier.clear();
735            if let Some(traces) = self.compute_state.traces.get_mut(&id) {
736                assert!(
737                    collection.sink_write_frontier.is_none(),
738                    "collection {id} has multiple frontiers"
739                );
740                traces.oks_mut().read_upper(&mut new_frontier);
741            } else if let Some(frontier) = &collection.sink_write_frontier {
742                new_frontier.clone_from(&frontier.borrow());
743            } else {
744                error!(id = ?id, "collection without write frontier");
745                continue;
746            }
747            let new_write_frontier = reported
748                .write_frontier
749                .allows_reporting(&new_frontier)
750                .then(|| new_frontier.clone());
751
752            // Collect the output frontier and check for progress.
753            //
754            // By default, the output frontier equals the write frontier (which is still stored in
755            // `new_frontier`). If the collection provides a compute frontier, we construct the
756            // output frontier by taking the meet of write and compute frontier, to avoid:
757            //  * reporting progress through times we have not yet written
758            //  * reporting progress through times we have not yet fully processed, for
759            //    collections that jump their write frontiers into the future
760            if let Some(probe) = &collection.compute_probe {
761                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
762            }
763            let new_output_frontier = reported
764                .output_frontier
765                .allows_reporting(&new_frontier)
766                .then(|| new_frontier.clone());
767
768            // Collect the input frontier and check for progress.
769            new_frontier.clear();
770            for probe in collection.input_probes.values() {
771                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
772            }
773            let new_input_frontier = reported
774                .input_frontier
775                .allows_reporting(&new_frontier)
776                .then(|| new_frontier.clone());
777
778            if let Some(frontier) = &new_write_frontier {
779                collection
780                    .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
781            }
782            if let Some(frontier) = &new_input_frontier {
783                collection
784                    .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
785            }
786            if let Some(frontier) = &new_output_frontier {
787                collection
788                    .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
789            }
790
791            let response = FrontiersResponse {
792                write_frontier: new_write_frontier,
793                input_frontier: new_input_frontier,
794                output_frontier: new_output_frontier,
795            };
796            if response.has_updates() {
797                responses.push((id, response));
798            }
799        }
800
801        for (id, frontiers) in responses {
802            self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
803        }
804    }
805
806    /// Report per-worker metrics.
807    pub(crate) fn report_metrics(&self) {
808        if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
809            let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
810            let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
811            let remaining = expiration - now;
812            self.compute_state
813                .metrics
814                .replica_expiration_remaining_seconds
815                .set(remaining)
816        }
817    }
818
819    /// Either complete the peek (and send the response) or put it in the pending set.
820    fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
821        let response = match &mut peek {
822            PendingPeek::Index(peek) => {
823                let start = Instant::now();
824
825                let peek_stash_eligible = peek
826                    .peek
827                    .finishing
828                    .is_streamable(peek.peek.result_desc.arity());
829
830                let peek_stash_enabled = {
831                    let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
832                    let peek_persist_stash_available =
833                        self.compute_state.peek_stash_persist_location.is_some();
834                    if !peek_persist_stash_available && enabled {
835                        error!("missing peek_stash_persist_location but peek stash is enabled");
836                    }
837                    enabled && peek_persist_stash_available
838                };
839
840                let peek_stash_threshold_bytes =
841                    PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
842
843                let metrics = IndexPeekMetrics {
844                    seek_fulfillment_seconds: &self
845                        .compute_state
846                        .metrics
847                        .index_peek_seek_fulfillment_seconds,
848                    frontier_check_seconds: &self
849                        .compute_state
850                        .metrics
851                        .index_peek_frontier_check_seconds,
852                    error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
853                    cursor_setup_seconds: &self
854                        .compute_state
855                        .metrics
856                        .index_peek_cursor_setup_seconds,
857                    row_iteration_seconds: &self
858                        .compute_state
859                        .metrics
860                        .index_peek_row_iteration_seconds,
861                    result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
862                    row_collection_seconds: &self
863                        .compute_state
864                        .metrics
865                        .index_peek_row_collection_seconds,
866                };
867
868                let status = peek.seek_fulfillment(
869                    upper,
870                    self.compute_state.max_result_size,
871                    peek_stash_enabled && peek_stash_eligible,
872                    peek_stash_threshold_bytes,
873                    &metrics,
874                );
875
876                self.compute_state
877                    .metrics
878                    .index_peek_total_seconds
879                    .observe(start.elapsed().as_secs_f64());
880
881                match status {
882                    PeekStatus::Ready(result) => Some(result),
883                    PeekStatus::NotReady => None,
884                    PeekStatus::UsePeekStash => {
885                        let _span =
886                            span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
887
888                        let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
889                            .get(&self.compute_state.worker_config);
890
891                        let stash_task = peek_stash::StashingPeek::start_upload(
892                            Arc::clone(&self.compute_state.persist_clients),
893                            self.compute_state
894                                .peek_stash_persist_location
895                                .as_ref()
896                                .expect("verified above"),
897                            peek.peek.clone(),
898                            peek.trace_bundle.clone(),
899                            peek_stash_batch_max_runs,
900                        );
901
902                        self.compute_state
903                            .pending_peeks
904                            .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
905                        return;
906                    }
907                }
908            }
909            PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
910                self.compute_state
911                    .metrics
912                    .persist_peek_seconds
913                    .observe(duration.as_secs_f64());
914                result
915            }),
916            PendingPeek::Stash(stashing_peek) => {
917                let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
918                let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
919                stashing_peek.pump_rows(num_batches, batch_size);
920
921                if let Ok((response, duration)) = stashing_peek.result.try_recv() {
922                    self.compute_state
923                        .metrics
924                        .stashed_peek_seconds
925                        .observe(duration.as_secs_f64());
926                    trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
927
928                    Some(response)
929                } else {
930                    None
931                }
932            }
933        };
934
935        if let Some(response) = response {
936            let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
937            self.send_peek_response(peek, response)
938        } else {
939            let uuid = peek.peek().uuid;
940            self.compute_state.pending_peeks.insert(uuid, peek);
941        }
942    }
943
944    /// Scan pending peeks and attempt to retire each.
945    pub fn process_peeks(&mut self) {
946        let mut upper = Antichain::new();
947        let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
948        for (_uuid, peek) in pending_peeks {
949            self.process_peek(&mut upper, peek);
950        }
951    }
952
953    /// Sends a response for this peek's resolution to the coordinator.
954    ///
955    /// Note that this function takes ownership of the `PendingPeek`, which is
956    /// meant to prevent multiple responses to the same peek.
957    #[mz_ore::instrument(level = "debug")]
958    fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
959        let log_event = peek.as_log_event(false);
960        // Respond with the response.
961        self.send_compute_response(ComputeResponse::PeekResponse(
962            peek.peek().uuid,
963            response,
964            OpenTelemetryContext::obtain(),
965        ));
966
967        // Log responding to the peek request.
968        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
969            logger.log(&log_event);
970        }
971    }
972
973    /// Scan the shared subscribe response buffer, and forward results along.
974    pub fn process_subscribes(&mut self) {
975        let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
976        for (sink_id, mut response) in subscribe_responses.drain(..) {
977            // Update frontier logging for this subscribe.
978            if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
979                let new_frontier = match &response {
980                    SubscribeResponse::Batch(b) => b.upper.clone(),
981                    SubscribeResponse::DroppedAt(_) => Antichain::new(),
982                };
983
984                let reported = collection.reported_frontiers();
985                assert!(
986                    reported.write_frontier.allows_reporting(&new_frontier),
987                    "subscribe write frontier regression: {:?} -> {:?}",
988                    reported.write_frontier,
989                    new_frontier,
990                );
991                assert!(
992                    reported.input_frontier.allows_reporting(&new_frontier),
993                    "subscribe input frontier regression: {:?} -> {:?}",
994                    reported.input_frontier,
995                    new_frontier,
996                );
997
998                collection
999                    .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1000                collection
1001                    .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1002                collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1003            } else {
1004                // Presumably tracking state for this subscribe was already dropped by
1005                // `drop_collection`. There is nothing left to do for logging.
1006            }
1007
1008            response
1009                .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1010            self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1011        }
1012    }
1013
1014    /// Scan the shared copy to response buffer, and forward results along.
1015    pub fn process_copy_tos(&self) {
1016        let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1017        for (sink_id, response) in responses.drain(..) {
1018            self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1019        }
1020    }
1021
1022    /// Send a response to the coordinator.
1023    fn send_compute_response(&self, response: ComputeResponse) {
1024        // Ignore send errors because the coordinator is free to ignore our
1025        // responses. This happens during shutdown.
1026        let _ = self.response_tx.send(response);
1027    }
1028
1029    /// Checks for dataflow expiration. Panics if we're past the replica expiration time.
1030    pub(crate) fn check_expiration(&self) {
1031        let now = mz_ore::now::SYSTEM_TIME();
1032        if self.compute_state.replica_expiration.less_than(&now.into()) {
1033            let now_datetime = mz_ore::now::to_datetime(now);
1034            let expiration_datetime = self
1035                .compute_state
1036                .replica_expiration
1037                .as_option()
1038                .map(Into::into)
1039                .map(mz_ore::now::to_datetime);
1040
1041            // We error and assert separately to produce structured logs in anything that depends
1042            // on tracing.
1043            error!(
1044                now,
1045                now_datetime = ?now_datetime,
1046                expiration = ?self.compute_state.replica_expiration.elements(),
1047                expiration_datetime = ?expiration_datetime,
1048                "replica expired"
1049            );
1050
1051            // Repeat condition for better error message.
1052            assert!(
1053                !self.compute_state.replica_expiration.less_than(&now.into()),
1054                "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1055                self.compute_state.replica_expiration.elements(),
1056            );
1057        }
1058    }
1059
1060    /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be
1061    /// dropped.
1062    ///
1063    /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under
1064    /// which dataflow expiration should be disabled.
1065    pub fn determine_dataflow_expiration(
1066        &self,
1067        time_dependence: &TimeDependence,
1068        until: &Antichain<mz_repr::Timestamp>,
1069    ) -> Antichain<mz_repr::Timestamp> {
1070        // Evaluate time dependence with respect to the expiration time.
1071        // * Step time forward to ensure the expiration time is different to the moment a dataflow
1072        //   can legitimately jump to.
1073        // * We cannot expire dataflow with an until that is less or equal to the expiration time.
1074        let iter = self
1075            .compute_state
1076            .replica_expiration
1077            .iter()
1078            .filter_map(|t| time_dependence.apply(*t))
1079            .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1080            .filter(|expiration| !until.less_equal(expiration));
1081        Antichain::from_iter(iter)
1082    }
1083}
1084
1085/// A peek against either an index or a Persist collection.
1086///
1087/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1088/// as each `PendingPeek` is meant to be dropped after it's responded to.
1089pub enum PendingPeek {
1090    /// A peek against an index. (Possibly a temporary index created for the purpose.)
1091    Index(IndexPeek),
1092    /// A peek against a Persist-backed collection.
1093    Persist(PersistPeek),
1094    /// A peek against an index that is being stashed in the peek stash by an
1095    /// async background task.
1096    Stash(peek_stash::StashingPeek),
1097}
1098
1099impl PendingPeek {
1100    /// Produces a corresponding log event.
1101    pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1102        let peek = self.peek();
1103        let (id, peek_type) = match &peek.target {
1104            PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1105            PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1106        };
1107        let uuid = peek.uuid.into_bytes();
1108        ComputeEvent::Peek(PeekEvent {
1109            id,
1110            time: peek.timestamp,
1111            uuid,
1112            peek_type,
1113            installed,
1114        })
1115    }
1116
1117    fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1118        let empty_frontier = Antichain::new();
1119        let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1120        trace_bundle
1121            .oks_mut()
1122            .set_logical_compaction(timestamp_frontier.borrow());
1123        trace_bundle
1124            .errs_mut()
1125            .set_logical_compaction(timestamp_frontier.borrow());
1126        trace_bundle
1127            .oks_mut()
1128            .set_physical_compaction(empty_frontier.borrow());
1129        trace_bundle
1130            .errs_mut()
1131            .set_physical_compaction(empty_frontier.borrow());
1132
1133        PendingPeek::Index(IndexPeek {
1134            peek,
1135            trace_bundle,
1136            span: tracing::Span::current(),
1137        })
1138    }
1139
1140    fn persist<A: Allocate>(
1141        peek: Peek,
1142        persist_clients: Arc<PersistClientCache>,
1143        metadata: CollectionMetadata,
1144        max_result_size: usize,
1145        timely_worker: &TimelyWorker<A>,
1146    ) -> Self {
1147        let active_worker = {
1148            // Choose the worker that does the actual peek arbitrarily but consistently.
1149            let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1150            chosen_index == timely_worker.index()
1151        };
1152        let activator = timely_worker.sync_activator_for([].into());
1153        let peek_uuid = peek.uuid;
1154
1155        let (result_tx, result_rx) = oneshot::channel();
1156        let timestamp = peek.timestamp;
1157        let mfp_plan = peek.map_filter_project.clone();
1158        let max_results_needed = peek
1159            .finishing
1160            .limit
1161            .map(|l| usize::cast_from(u64::from(l)))
1162            .unwrap_or(usize::MAX)
1163            + peek.finishing.offset;
1164        let order_by = peek.finishing.order_by.clone();
1165
1166        // Persist peeks can include at most one literal constraint.
1167        let literal_constraint = peek
1168            .literal_constraints
1169            .clone()
1170            .map(|rows| rows.into_element());
1171
1172        let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1173            let start = Instant::now();
1174            let result = if active_worker {
1175                PersistPeek::do_peek(
1176                    &persist_clients,
1177                    metadata,
1178                    timestamp,
1179                    literal_constraint,
1180                    mfp_plan,
1181                    max_result_size,
1182                    max_results_needed,
1183                )
1184                .await
1185            } else {
1186                Ok(vec![])
1187            };
1188            let result = match result {
1189                Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1190                Err(e) => PeekResponse::Error(e.to_string()),
1191            };
1192            match result_tx.send((result, start.elapsed())) {
1193                Ok(()) => {}
1194                Err((_result, elapsed)) => {
1195                    debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1196                }
1197            }
1198            match activator.activate() {
1199                Ok(()) => {}
1200                Err(_) => {
1201                    debug!("unable to wake timely after completed peek {peek_uuid}");
1202                }
1203            }
1204        });
1205        PendingPeek::Persist(PersistPeek {
1206            peek,
1207            _abort_handle: task_handle.abort_on_drop(),
1208            result: result_rx,
1209            span: tracing::Span::current(),
1210        })
1211    }
1212
1213    fn span(&self) -> &tracing::Span {
1214        match self {
1215            PendingPeek::Index(p) => &p.span,
1216            PendingPeek::Persist(p) => &p.span,
1217            PendingPeek::Stash(p) => &p.span,
1218        }
1219    }
1220
1221    pub(crate) fn peek(&self) -> &Peek {
1222        match self {
1223            PendingPeek::Index(p) => &p.peek,
1224            PendingPeek::Persist(p) => &p.peek,
1225            PendingPeek::Stash(p) => &p.peek,
1226        }
1227    }
1228}
1229
1230/// An in-progress Persist peek.
1231///
1232/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1233/// as each `PendingPeek` is meant to be dropped after it's responded to.
1234pub struct PersistPeek {
1235    pub(crate) peek: Peek,
1236    /// A background task that's responsible for producing the peek results.
1237    /// If we're no longer interested in the results, we abort the task.
1238    _abort_handle: AbortOnDropHandle<()>,
1239    /// The result of the background task, eventually.
1240    result: oneshot::Receiver<(PeekResponse, Duration)>,
1241    /// The `tracing::Span` tracking this peek's operation
1242    span: tracing::Span,
1243}
1244
1245impl PersistPeek {
1246    async fn do_peek(
1247        persist_clients: &PersistClientCache,
1248        metadata: CollectionMetadata,
1249        as_of: Timestamp,
1250        literal_constraint: Option<Row>,
1251        mfp_plan: SafeMfpPlan,
1252        max_result_size: usize,
1253        mut limit_remaining: usize,
1254    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1255        let client = persist_clients
1256            .open(metadata.persist_location)
1257            .await
1258            .map_err(|e| e.to_string())?;
1259
1260        let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1261            .open_leased_reader(
1262                metadata.data_shard,
1263                Arc::new(metadata.relation_desc.clone()),
1264                Arc::new(UnitSchema),
1265                Diagnostics::from_purpose("persist::peek"),
1266                USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1267            )
1268            .await
1269            .map_err(|e| e.to_string())?;
1270
1271        // If we are using txn-wal for this collection, then the upper might
1272        // be advanced lazily and we have to go through txn-wal for reads.
1273        //
1274        // TODO: If/when we have a process-wide TxnsRead worker for clusterd,
1275        // use in here (instead of opening a new TxnsCache) to save a persist
1276        // reader registration and some txns shard read traffic.
1277        let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1278            Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1279        } else {
1280            None
1281        };
1282
1283        let metrics = client.metrics();
1284
1285        let mut cursor = StatsCursor::new(
1286            &mut reader,
1287            txns_read.as_mut(),
1288            metrics,
1289            &mfp_plan,
1290            &metadata.relation_desc,
1291            Antichain::from_elem(as_of),
1292        )
1293        .await
1294        .map_err(|since| {
1295            format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1296        })?;
1297
1298        // Re-used state for processing and building rows.
1299        let mut result = vec![];
1300        let mut datum_vec = DatumVec::new();
1301        let mut row_builder = Row::default();
1302        let arena = RowArena::new();
1303        let mut total_size = 0usize;
1304
1305        let literal_len = match &literal_constraint {
1306            None => 0,
1307            Some(row) => row.iter().count(),
1308        };
1309
1310        'collect: while limit_remaining > 0 {
1311            let Some(batch) = cursor.next().await else {
1312                break;
1313            };
1314            for (data, _, d) in batch {
1315                let row = data.map_err(|e| e.to_string())?;
1316
1317                if let Some(literal) = &literal_constraint {
1318                    match row.iter().take(literal_len).cmp(literal.iter()) {
1319                        Ordering::Less => continue,
1320                        Ordering::Equal => {}
1321                        Ordering::Greater => break 'collect,
1322                    }
1323                }
1324
1325                let count: usize = d.try_into().map_err(|_| {
1326                    error!(
1327                        shard = %metadata.data_shard, diff = d, ?row,
1328                        "persist peek encountered negative multiplicities",
1329                    );
1330                    format!(
1331                        "Invalid data in source, \
1332                         saw retractions ({}) for row that does not exist: {:?}",
1333                        -d, row,
1334                    )
1335                })?;
1336                let Some(count) = NonZeroUsize::new(count) else {
1337                    continue;
1338                };
1339                let mut datum_local = datum_vec.borrow_with(&row);
1340                let eval_result = mfp_plan
1341                    .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1342                    .map(|row| row.cloned())
1343                    .map_err(|e| e.to_string())?;
1344                if let Some(row) = eval_result {
1345                    total_size = total_size
1346                        .saturating_add(row.byte_len())
1347                        .saturating_add(std::mem::size_of::<NonZeroUsize>());
1348                    if total_size > max_result_size {
1349                        return Err(format!(
1350                            "result exceeds max size of {}",
1351                            ByteSize::b(u64::cast_from(max_result_size))
1352                        ));
1353                    }
1354                    result.push((row, count));
1355                    limit_remaining = limit_remaining.saturating_sub(count.get());
1356                    if limit_remaining == 0 {
1357                        break;
1358                    }
1359                }
1360            }
1361        }
1362
1363        Ok(result)
1364    }
1365}
1366
1367/// An in-progress index-backed peek, and data to eventually fulfill it.
1368pub struct IndexPeek {
1369    peek: Peek,
1370    /// The data from which the trace derives.
1371    trace_bundle: TraceBundle,
1372    /// The `tracing::Span` tracking this peek's operation
1373    span: tracing::Span,
1374}
1375
1376/// Histogram metrics for index peek phases.
1377///
1378/// This struct bundles references to the various histogram metrics used to
1379/// instrument the index peek processing pipeline.
1380pub(crate) struct IndexPeekMetrics<'a> {
1381    pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1382    pub frontier_check_seconds: &'a prometheus::Histogram,
1383    pub error_scan_seconds: &'a prometheus::Histogram,
1384    pub cursor_setup_seconds: &'a prometheus::Histogram,
1385    pub row_iteration_seconds: &'a prometheus::Histogram,
1386    pub result_sort_seconds: &'a prometheus::Histogram,
1387    pub row_collection_seconds: &'a prometheus::Histogram,
1388}
1389
1390impl IndexPeek {
1391    /// Attempts to fulfill the peek and reports success.
1392    ///
1393    /// To produce output at `peek.timestamp`, we must be certain that
1394    /// it is no longer changing. A trace guarantees that all future
1395    /// changes will be greater than or equal to an element of `upper`.
1396    ///
1397    /// If an element of `upper` is less or equal to `peek.timestamp`,
1398    /// then there can be further updates that would change the output.
1399    /// If no element of `upper` is less or equal to `peek.timestamp`,
1400    /// then for any time `t` less or equal to `peek.timestamp` it is
1401    /// not the case that `upper` is less or equal to that timestamp,
1402    /// and so the result cannot further evolve.
1403    fn seek_fulfillment(
1404        &mut self,
1405        upper: &mut Antichain<Timestamp>,
1406        max_result_size: u64,
1407        peek_stash_eligible: bool,
1408        peek_stash_threshold_bytes: usize,
1409        metrics: &IndexPeekMetrics<'_>,
1410    ) -> PeekStatus {
1411        let method_start = Instant::now();
1412
1413        self.trace_bundle.oks_mut().read_upper(upper);
1414        if upper.less_equal(&self.peek.timestamp) {
1415            return PeekStatus::NotReady;
1416        }
1417        self.trace_bundle.errs_mut().read_upper(upper);
1418        if upper.less_equal(&self.peek.timestamp) {
1419            return PeekStatus::NotReady;
1420        }
1421
1422        let read_frontier = self.trace_bundle.compaction_frontier();
1423        if !read_frontier.less_equal(&self.peek.timestamp) {
1424            let error = format!(
1425                "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1426                read_frontier.elements(),
1427                self.peek.timestamp,
1428            );
1429            return PeekStatus::Ready(PeekResponse::Error(error));
1430        }
1431
1432        metrics
1433            .frontier_check_seconds
1434            .observe(method_start.elapsed().as_secs_f64());
1435
1436        let result = self.collect_finished_data(
1437            max_result_size,
1438            peek_stash_eligible,
1439            peek_stash_threshold_bytes,
1440            metrics,
1441        );
1442
1443        metrics
1444            .seek_fulfillment_seconds
1445            .observe(method_start.elapsed().as_secs_f64());
1446
1447        result
1448    }
1449
1450    /// Collects data for a known-complete peek from the ok stream.
1451    fn collect_finished_data(
1452        &mut self,
1453        max_result_size: u64,
1454        peek_stash_eligible: bool,
1455        peek_stash_threshold_bytes: usize,
1456        metrics: &IndexPeekMetrics<'_>,
1457    ) -> PeekStatus {
1458        let error_scan_start = Instant::now();
1459
1460        // Check if there exist any errors and, if so, return whatever one we
1461        // find first.
1462        let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1463        while cursor.key_valid(&storage) {
1464            let mut copies = Diff::ZERO;
1465            cursor.map_times(&storage, |time, diff| {
1466                if time.less_equal(&self.peek.timestamp) {
1467                    copies += diff;
1468                }
1469            });
1470            if copies.is_negative() {
1471                let error = cursor.key(&storage);
1472                error!(
1473                    target = %self.peek.target.id(), diff = %copies, %error,
1474                    "index peek encountered negative multiplicities in error trace",
1475                );
1476                return PeekStatus::Ready(PeekResponse::Error(format!(
1477                    "Invalid data in source errors, \
1478                    saw retractions ({}) for row that does not exist: {}",
1479                    -copies, error,
1480                )));
1481            }
1482            if copies.is_positive() {
1483                return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1484            }
1485            cursor.step_key(&storage);
1486        }
1487
1488        metrics
1489            .error_scan_seconds
1490            .observe(error_scan_start.elapsed().as_secs_f64());
1491
1492        Self::collect_ok_finished_data(
1493            &self.peek,
1494            self.trace_bundle.oks_mut(),
1495            max_result_size,
1496            peek_stash_eligible,
1497            peek_stash_threshold_bytes,
1498            metrics,
1499        )
1500    }
1501
1502    /// Collects data for a known-complete peek from the ok stream.
1503    fn collect_ok_finished_data<Tr>(
1504        peek: &Peek<Timestamp>,
1505        oks_handle: &mut Tr,
1506        max_result_size: u64,
1507        peek_stash_eligible: bool,
1508        peek_stash_threshold_bytes: usize,
1509        metrics: &IndexPeekMetrics<'_>,
1510    ) -> PeekStatus
1511    where
1512        for<'a> Tr: TraceReader<
1513                Key<'a>: ToDatumIter + Eq,
1514                KeyOwn = Row,
1515                Val<'a>: ToDatumIter,
1516                TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1517                DiffGat<'a> = &'a Diff,
1518            >,
1519    {
1520        let max_result_size = usize::cast_from(max_result_size);
1521        let count_byte_size = size_of::<NonZeroUsize>();
1522
1523        // Cursor setup timing
1524        let cursor_setup_start = Instant::now();
1525
1526        // We clone `literal_constraints` here because we don't want to move the constraints
1527        // out of the peek struct, and don't want to modify in-place.
1528        let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1529            peek.target.id().clone(),
1530            peek.map_filter_project.clone(),
1531            peek.timestamp,
1532            peek.literal_constraints.clone().as_deref_mut(),
1533            oks_handle,
1534        );
1535
1536        metrics
1537            .cursor_setup_seconds
1538            .observe(cursor_setup_start.elapsed().as_secs_f64());
1539
1540        // Accumulated `Vec<(row, count)>` results that we are likely to return.
1541        let mut results = Vec::new();
1542        let mut total_size: usize = 0;
1543
1544        // When set, a bound on the number of records we need to return.
1545        // The requirements on the records are driven by the finishing's
1546        // `order_by` field. Further limiting will happen when the results
1547        // are collected, so we don't need to have exactly this many results,
1548        // just at least those results that would have been returned.
1549        let max_results = peek.finishing.num_rows_needed();
1550
1551        let mut l_datum_vec = DatumVec::new();
1552        let mut r_datum_vec = DatumVec::new();
1553
1554        // Row iteration timing
1555        let row_iteration_start = Instant::now();
1556        let mut sort_time_accum = Duration::ZERO;
1557
1558        while let Some(row) = peek_iterator.next() {
1559            let row = match row {
1560                Ok(row) => row,
1561                Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1562            };
1563            let (row, copies) = row;
1564            let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1565
1566            total_size = total_size
1567                .saturating_add(row.byte_len())
1568                .saturating_add(count_byte_size);
1569            if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1570                return PeekStatus::UsePeekStash;
1571            }
1572            if total_size > max_result_size {
1573                return PeekStatus::Ready(PeekResponse::Error(format!(
1574                    "result exceeds max size of {}",
1575                    ByteSize::b(u64::cast_from(max_result_size))
1576                )));
1577            }
1578
1579            results.push((row, copies));
1580
1581            // If we hold many more than `max_results` records, we can thin down
1582            // `results` using `self.finishing.ordering`.
1583            if let Some(max_results) = max_results {
1584                // We use a threshold twice what we intend, to amortize the work
1585                // across all of the insertions. We could tighten this, but it
1586                // works for the moment.
1587                if results.len() >= 2 * max_results {
1588                    if peek.finishing.order_by.is_empty() {
1589                        results.truncate(max_results);
1590                        metrics
1591                            .row_iteration_seconds
1592                            .observe(row_iteration_start.elapsed().as_secs_f64());
1593                        metrics
1594                            .result_sort_seconds
1595                            .observe(sort_time_accum.as_secs_f64());
1596                        let row_collection_start = Instant::now();
1597                        let collection = RowCollection::new(results, &peek.finishing.order_by);
1598                        metrics
1599                            .row_collection_seconds
1600                            .observe(row_collection_start.elapsed().as_secs_f64());
1601                        return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1602                    } else {
1603                        // We can sort `results` and then truncate to `max_results`.
1604                        // This has an effect similar to a priority queue, without
1605                        // its interactive dequeueing properties.
1606                        // TODO: Had we left these as `Vec<Datum>` we would avoid
1607                        // the unpacking; we should consider doing that, although
1608                        // it will require a re-pivot of the code to branch on this
1609                        // inner test (as we prefer not to maintain `Vec<Datum>`
1610                        // in the other case).
1611                        let sort_start = Instant::now();
1612                        results.sort_by(|left, right| {
1613                            let left_datums = l_datum_vec.borrow_with(&left.0);
1614                            let right_datums = r_datum_vec.borrow_with(&right.0);
1615                            mz_expr::compare_columns(
1616                                &peek.finishing.order_by,
1617                                &left_datums,
1618                                &right_datums,
1619                                || left.0.cmp(&right.0),
1620                            )
1621                        });
1622                        sort_time_accum += sort_start.elapsed();
1623                        let dropped = results.drain(max_results..);
1624                        let dropped_size =
1625                            dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1626                                acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1627                            });
1628                        total_size = total_size.saturating_sub(dropped_size);
1629                    }
1630                }
1631            }
1632        }
1633
1634        metrics
1635            .row_iteration_seconds
1636            .observe(row_iteration_start.elapsed().as_secs_f64());
1637        metrics
1638            .result_sort_seconds
1639            .observe(sort_time_accum.as_secs_f64());
1640
1641        let row_collection_start = Instant::now();
1642        let collection = RowCollection::new(results, &peek.finishing.order_by);
1643        metrics
1644            .row_collection_seconds
1645            .observe(row_collection_start.elapsed().as_secs_f64());
1646        PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1647    }
1648}
1649
1650/// For keeping track of the state of pending or ready peeks, and managing
1651/// control flow.
1652enum PeekStatus {
1653    /// The frontiers of objects are not yet advanced enough, peek is still
1654    /// pending.
1655    NotReady,
1656    /// The result size is above the configured threshold and the peek is
1657    /// eligible for using the peek result stash.
1658    UsePeekStash,
1659    /// The peek result is ready.
1660    Ready(PeekResponse),
1661}
1662
1663/// The frontiers we have reported to the controller for a collection.
1664#[derive(Debug)]
1665struct ReportedFrontiers {
1666    /// The reported write frontier.
1667    write_frontier: ReportedFrontier,
1668    /// The reported input frontier.
1669    input_frontier: ReportedFrontier,
1670    /// The reported output frontier.
1671    output_frontier: ReportedFrontier,
1672}
1673
1674impl ReportedFrontiers {
1675    /// Creates a new `ReportedFrontiers` instance.
1676    fn new() -> Self {
1677        Self {
1678            write_frontier: ReportedFrontier::new(),
1679            input_frontier: ReportedFrontier::new(),
1680            output_frontier: ReportedFrontier::new(),
1681        }
1682    }
1683}
1684
1685/// A frontier we have reported to the controller, or the least frontier we are allowed to report.
1686#[derive(Clone, Debug)]
1687pub enum ReportedFrontier {
1688    /// A frontier has been previously reported.
1689    Reported(Antichain<Timestamp>),
1690    /// No frontier has been reported yet.
1691    NotReported {
1692        /// A lower bound for frontiers that may be reported in the future.
1693        lower: Antichain<Timestamp>,
1694    },
1695}
1696
1697impl ReportedFrontier {
1698    /// Create a new `ReportedFrontier` enforcing the minimum lower bound.
1699    pub fn new() -> Self {
1700        let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1701        Self::NotReported { lower }
1702    }
1703
1704    /// Whether the reported frontier is the empty frontier.
1705    pub fn is_empty(&self) -> bool {
1706        match self {
1707            Self::Reported(frontier) => frontier.is_empty(),
1708            Self::NotReported { .. } => false,
1709        }
1710    }
1711
1712    /// Whether this `ReportedFrontier` allows reporting the given frontier.
1713    ///
1714    /// A `ReportedFrontier` allows reporting of another frontier if:
1715    ///  * The other frontier is greater than the reported frontier.
1716    ///  * The other frontier is greater than or equal to the lower bound.
1717    fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1718        match self {
1719            Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1720            Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1721        }
1722    }
1723}
1724
1725/// State maintained for a compute collection.
1726pub struct CollectionState {
1727    /// Tracks the frontiers that have been reported to the controller.
1728    reported_frontiers: ReportedFrontiers,
1729    /// The index of the dataflow computing this collection.
1730    ///
1731    /// Used for dropping the dataflow when the collection is dropped.
1732    /// The Dataflow index is wrapped in an `Rc`s and can be shared between collections, to reflect
1733    /// the possibility that a single dataflow can export multiple collections.
1734    dataflow_index: Rc<usize>,
1735    /// Whether this collection is a subscribe or copy-to.
1736    ///
1737    /// The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
1738    /// collections, so we need to be able to recognize them. This is something we would like to
1739    /// change in the future (database-issues#4701).
1740    pub is_subscribe_or_copy: bool,
1741    /// The collection's initial as-of frontier.
1742    ///
1743    /// Used to determine hydration status.
1744    as_of: Antichain<Timestamp>,
1745
1746    /// A token that should be dropped when this collection is dropped to clean up associated
1747    /// sink state.
1748    ///
1749    /// Only `Some` if the collection is a sink.
1750    pub sink_token: Option<SinkToken>,
1751    /// Frontier of sink writes.
1752    ///
1753    /// Only `Some` if the collection is a sink and *not* a subscribe.
1754    pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1755    /// Frontier probes for every input to the collection.
1756    pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1757    /// A probe reporting the frontier of times through which all collection outputs have been
1758    /// computed (but not necessarily written).
1759    ///
1760    /// `None` for collections with compute frontiers equal to their write frontiers.
1761    pub compute_probe: Option<probe::Handle<Timestamp>>,
1762    /// Logging state maintained for this collection.
1763    logging: Option<CollectionLogging>,
1764    /// Metrics tracked for this collection.
1765    metrics: CollectionMetrics,
1766    /// Send-side to transition a dataflow from read-only mode to read-write mode.
1767    ///
1768    /// All dataflows start in read-only mode. Only after receiving a
1769    /// `AllowWrites` command from the controller will they transition to
1770    /// read-write mode.
1771    ///
1772    /// A dataflow in read-only mode must not affect any external state.
1773    ///
1774    /// NOTE: In the future, we might want a more complicated flag, for example
1775    /// something that tells us after which timestamp we are allowed to write.
1776    /// In this first version we are keeping things as simple as possible!
1777    read_only_tx: watch::Sender<bool>,
1778    /// Receive-side to observe whether a dataflow is in read-only mode.
1779    pub read_only_rx: watch::Receiver<bool>,
1780}
1781
1782impl CollectionState {
1783    fn new(
1784        dataflow_index: Rc<usize>,
1785        is_subscribe_or_copy: bool,
1786        as_of: Antichain<Timestamp>,
1787        metrics: CollectionMetrics,
1788    ) -> Self {
1789        // We always initialize as read_only=true. Only when we're explicitly
1790        // allowed to we switch to read-write.
1791        let (read_only_tx, read_only_rx) = watch::channel(true);
1792
1793        Self {
1794            reported_frontiers: ReportedFrontiers::new(),
1795            dataflow_index,
1796            is_subscribe_or_copy,
1797            as_of,
1798            sink_token: None,
1799            sink_write_frontier: None,
1800            input_probes: Default::default(),
1801            compute_probe: None,
1802            logging: None,
1803            metrics,
1804            read_only_tx,
1805            read_only_rx,
1806        }
1807    }
1808
1809    /// Return the frontiers that have been reported to the controller.
1810    fn reported_frontiers(&self) -> &ReportedFrontiers {
1811        &self.reported_frontiers
1812    }
1813
1814    /// Reset all reported frontiers to the given value.
1815    pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1816        self.reported_frontiers.write_frontier = frontier.clone();
1817        self.reported_frontiers.input_frontier = frontier.clone();
1818        self.reported_frontiers.output_frontier = frontier;
1819    }
1820
1821    /// Set the write frontier that has been reported to the controller.
1822    fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1823        if let Some(logging) = &mut self.logging {
1824            let time = match &frontier {
1825                ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1826                ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1827            };
1828            logging.set_frontier(time);
1829        }
1830
1831        self.reported_frontiers.write_frontier = frontier;
1832    }
1833
1834    /// Set the input frontier that has been reported to the controller.
1835    fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1836        // Use this opportunity to update our input frontier logging.
1837        if let Some(logging) = &mut self.logging {
1838            for (id, probe) in &self.input_probes {
1839                let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1840                logging.set_import_frontier(*id, new_time);
1841            }
1842        }
1843
1844        self.reported_frontiers.input_frontier = frontier;
1845    }
1846
1847    /// Set the output frontier that has been reported to the controller.
1848    fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1849        let already_hydrated = self.hydrated();
1850
1851        self.reported_frontiers.output_frontier = frontier;
1852
1853        if !already_hydrated && self.hydrated() {
1854            if let Some(logging) = &mut self.logging {
1855                logging.set_hydrated();
1856            }
1857            self.metrics.record_collection_hydrated();
1858        }
1859    }
1860
1861    /// Return whether this collection is hydrated.
1862    fn hydrated(&self) -> bool {
1863        match &self.reported_frontiers.output_frontier {
1864            ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1865            ReportedFrontier::NotReported { .. } => false,
1866        }
1867    }
1868
1869    /// Allow writes for this collection.
1870    fn allow_writes(&self) {
1871        info!(
1872            dataflow_index = *self.dataflow_index,
1873            export = ?self.logging.as_ref().map(|l| l.export_id()),
1874            "allowing writes for dataflow",
1875        );
1876        let _ = self.read_only_tx.send(false);
1877    }
1878}