Skip to main content

mz_compute/
compute_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for compute timely instances.
7
8use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::{Cursor, TraceReader};
21use mz_compute_client::logging::LoggingConfig;
22use mz_compute_client::protocol::command::{
23    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
24};
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::{
27    ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
28};
29use mz_compute_types::dataflows::DataflowDescription;
30use mz_compute_types::dyncfgs::{
31    ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
32    PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
33};
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::row::RowCollection;
37use mz_expr::{RowComparator, SafeMfpPlan};
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::{MetricsRegistry, UIntGauge};
41use mz_ore::now::EpochMillis;
42use mz_ore::soft_panic_or_log;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, trace, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82/// Worker-local state that is maintained across dataflows.
83///
84/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
85/// done between data ingress and egress.
86pub struct ComputeState {
87    /// State kept for each installed compute collection.
88    ///
89    /// Each collection has exactly one frontier.
90    /// How the frontier is communicated depends on the collection type:
91    ///  * Frontiers of indexes are equal to the frontier of their corresponding traces in the
92    ///    `TraceManager`.
93    ///  * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
94    ///  * Subscribes report their frontiers through the `subscribe_response_buffer`.
95    pub collections: BTreeMap<GlobalId, CollectionState>,
96    /// The traces available for sharing across dataflows.
97    pub traces: TraceManager,
98    /// Shared buffer with SUBSCRIBE operator instances by which they can respond.
99    ///
100    /// The entries are pairs of sink identifier (to identify the subscribe instance)
101    /// and the response itself.
102    pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
103    /// Shared buffer with S3 oneshot operator instances by which they can respond.
104    ///
105    /// The entries are pairs of sink identifier (to identify the s3 oneshot instance)
106    /// and the response itself.
107    pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
108    /// Peek commands that are awaiting fulfillment.
109    pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
110    /// The persist location where we can stash large peek results.
111    pub peek_stash_persist_location: Option<PersistLocation>,
112    /// The logger, from Timely's logging framework, if logs are enabled.
113    pub compute_logger: Option<logging::compute::Logger>,
114    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
115    /// This is intentionally shared between workers.
116    pub persist_clients: Arc<PersistClientCache>,
117    /// Context necessary for rendering txn-wal operators.
118    pub txns_ctx: TxnsContext,
119    /// History of commands received by this workers and all its peers.
120    pub command_history: ComputeCommandHistory<UIntGauge>,
121    /// Max size in bytes of any result.
122    max_result_size: u64,
123    /// Specification for rendering linear joins.
124    pub linear_join_spec: LinearJoinSpec,
125    /// Metrics for this worker.
126    pub metrics: WorkerMetrics,
127    /// A process-global handle to tracing configuration.
128    tracing_handle: Arc<TracingHandle>,
129    /// Other configuration for compute
130    pub context: ComputeInstanceContext,
131    /// Per-worker dynamic configuration.
132    ///
133    /// This is separate from the process-global `ConfigSet` and contains config options that need
134    /// to be applied consistently with compute command order.
135    ///
136    /// For example, for options that influence dataflow rendering it is important that all workers
137    /// render the same dataflow with the same options. If these options were stored in a global
138    /// `ConfigSet`, we couldn't guarantee that all workers observe changes to them at the same
139    /// point in the stream of compute commands. Storing per-worker configuration ensures that
140    /// because each worker's configuration is only updated once that worker observes the
141    /// respective `UpdateConfiguration` command.
142    ///
143    /// Reference-counted to avoid cloning for `Context`.
144    pub worker_config: Rc<ConfigSet>,
145
146    /// The process-global metrics registry.
147    pub metrics_registry: MetricsRegistry,
148
149    /// The number of timely workers per process.
150    pub workers_per_process: usize,
151
152    /// Collections awaiting schedule instruction by the controller.
153    ///
154    /// Each entry stores a reference to a token that can be dropped to unsuspend the collection's
155    /// dataflow. Multiple collections can reference the same token if they are exported by the
156    /// same dataflow.
157    suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
158
159    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
160    /// perform maintenance with every `step_or_park` invocation.
161    pub server_maintenance_interval: Duration,
162
163    /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started.
164    ///
165    /// Used to compute `replica_expiration`.
166    pub init_system_time: EpochMillis,
167
168    /// The maximum time for which the replica is expected to live. If not empty, dataflows in the
169    /// replica can drop diffs associated with timestamps beyond the replica expiration.
170    /// The replica will panic if such dataflows are not dropped before the replica has expired.
171    pub replica_expiration: Antichain<Timestamp>,
172}
173
174impl ComputeState {
175    /// Construct a new `ComputeState`.
176    pub fn new(
177        persist_clients: Arc<PersistClientCache>,
178        txns_ctx: TxnsContext,
179        metrics: WorkerMetrics,
180        tracing_handle: Arc<TracingHandle>,
181        context: ComputeInstanceContext,
182        metrics_registry: MetricsRegistry,
183        workers_per_process: usize,
184    ) -> Self {
185        let traces = TraceManager::new(metrics.clone());
186        let command_history = ComputeCommandHistory::new(metrics.for_history());
187
188        Self {
189            collections: Default::default(),
190            traces,
191            subscribe_response_buffer: Default::default(),
192            copy_to_response_buffer: Default::default(),
193            pending_peeks: Default::default(),
194            peek_stash_persist_location: None,
195            compute_logger: None,
196            persist_clients,
197            txns_ctx,
198            command_history,
199            max_result_size: u64::MAX,
200            linear_join_spec: Default::default(),
201            metrics,
202            tracing_handle,
203            context,
204            worker_config: mz_dyncfgs::all_dyncfgs().into(),
205            metrics_registry,
206            workers_per_process,
207            suspended_collections: Default::default(),
208            server_maintenance_interval: Duration::ZERO,
209            init_system_time: mz_ore::now::SYSTEM_TIME(),
210            replica_expiration: Antichain::default(),
211        }
212    }
213
214    /// Return a mutable reference to the identified collection.
215    ///
216    /// Panics if the collection doesn't exist.
217    pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
218        self.collections
219            .get_mut(&id)
220            .expect("collection must exist")
221    }
222
223    /// Construct a new frontier probe for the given input and add it to the state of the given
224    /// collections.
225    ///
226    /// The caller is responsible for attaching the returned probe handle to the respective
227    /// dataflow input stream.
228    pub fn input_probe_for(
229        &mut self,
230        input_id: GlobalId,
231        collection_ids: impl Iterator<Item = GlobalId>,
232    ) -> probe::Handle<Timestamp> {
233        let probe = probe::Handle::default();
234        for id in collection_ids {
235            if let Some(collection) = self.collections.get_mut(&id) {
236                collection.input_probes.insert(input_id, probe.clone());
237            }
238        }
239        probe
240    }
241
242    /// Apply the current `worker_config` to the compute state.
243    fn apply_worker_config(&mut self) {
244        use mz_compute_types::dyncfgs::*;
245
246        let config = &self.worker_config;
247
248        self.linear_join_spec = LinearJoinSpec::from_config(config);
249
250        if ENABLE_LGALLOC.get(config) {
251            if let Some(path) = &self.context.scratch_directory {
252                let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
253                let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
254                let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
255                let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
256                let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
257                info!(
258                    ?path,
259                    backgrund_interval=?interval,
260                    clear_bytes,
261                    eager_return,
262                    file_growth_dampener,
263                    local_buffer_bytes,
264                    "enabling lgalloc"
265                );
266                let background_worker_config = lgalloc::BackgroundWorkerConfig {
267                    interval,
268                    clear_bytes,
269                };
270                lgalloc::lgalloc_set_config(
271                    lgalloc::LgAlloc::new()
272                        .enable()
273                        .with_path(path.clone())
274                        .with_background_config(background_worker_config)
275                        .eager_return(eager_return)
276                        .file_growth_dampener(file_growth_dampener)
277                        .local_buffer_bytes(local_buffer_bytes),
278                );
279            } else {
280                debug!("not enabling lgalloc, scratch directory not specified");
281            }
282        } else {
283            info!("disabling lgalloc");
284            lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
285        }
286
287        crate::memory_limiter::apply_limiter_config(config);
288
289        mz_ore::region::ENABLE_LGALLOC_REGION.store(
290            ENABLE_COLUMNATION_LGALLOC.get(config),
291            std::sync::atomic::Ordering::Relaxed,
292        );
293
294        let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
295        mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
296
297        // Remember the maintenance interval locally to avoid reading it from the config set on
298        // every server iteration.
299        self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
300
301        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
302        match overflowing_behavior.parse() {
303            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
304            Err(err) => {
305                error!(
306                    err,
307                    overflowing_behavior, "Invalid value for ore_overflowing_behavior"
308                );
309            }
310        }
311    }
312
313    /// Apply the provided replica expiration `offset` by converting it to a frontier relative to
314    /// the replica's initialization system time.
315    ///
316    /// Only expected to be called once when creating the instance. Guards against calling it
317    /// multiple times by checking if the local expiration time is set.
318    pub fn apply_expiration_offset(&mut self, offset: Duration) {
319        if self.replica_expiration.is_empty() {
320            let offset: EpochMillis = offset
321                .as_millis()
322                .try_into()
323                .expect("duration must fit within u64");
324            let replica_expiration_millis = self.init_system_time + offset;
325            let replica_expiration = Timestamp::from(replica_expiration_millis);
326
327            info!(
328                offset = %offset,
329                replica_expiration_millis = %replica_expiration_millis,
330                replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
331                "setting replica expiration",
332            );
333            self.replica_expiration = Antichain::from_elem(replica_expiration);
334
335            // Record the replica expiration in the metrics.
336            self.metrics
337                .replica_expiration_timestamp_seconds
338                .set(replica_expiration.into());
339        }
340    }
341
342    /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
343    /// appropriate to this replica.
344    pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
345        use mz_compute_types::dyncfgs::{
346            DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
347        };
348
349        if self.persist_clients.cfg.is_cc_active {
350            DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
351        } else {
352            DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
353        }
354    }
355}
356
357/// A wrapper around [ComputeState] with a live timely worker and response channel.
358pub(crate) struct ActiveComputeState<'a, A: Allocate> {
359    /// The underlying Timely worker.
360    pub timely_worker: &'a mut TimelyWorker<A>,
361    /// The compute state itself.
362    pub compute_state: &'a mut ComputeState,
363    /// The channel over which frontier information is reported.
364    pub response_tx: &'a mut ResponseSender,
365}
366
367/// A token that keeps a sink alive.
368pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
369
370impl SinkToken {
371    /// Create a new `SinkToken`.
372    pub fn new(t: Box<dyn Any>) -> Self {
373        Self(t)
374    }
375}
376
377impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
378    /// Entrypoint for applying a compute command.
379    #[mz_ore::instrument(level = "debug")]
380    pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
381        use ComputeCommand::*;
382
383        self.compute_state.command_history.push(cmd.clone());
384
385        // Record the command duration, per worker and command kind.
386        let timer = self
387            .compute_state
388            .metrics
389            .handle_command_duration_seconds
390            .for_command(&cmd)
391            .start_timer();
392
393        match cmd {
394            Hello { .. } => panic!("Hello must be captured before"),
395            CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
396            InitializationComplete => (),
397            UpdateConfiguration(params) => self.handle_update_configuration(*params),
398            CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
399            Schedule(id) => self.handle_schedule(id),
400            AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
401            Peek(peek) => {
402                peek.otel_ctx.attach_as_parent();
403                self.handle_peek(*peek)
404            }
405            CancelPeek { uuid } => self.handle_cancel_peek(uuid),
406            AllowWrites(id) => {
407                self.handle_allow_writes(id);
408            }
409        }
410
411        timer.observe_duration();
412    }
413
414    fn handle_create_instance(&mut self, config: InstanceConfig) {
415        // Ensure the state is consistent with the config before we initialize anything.
416        self.compute_state.apply_worker_config();
417        if let Some(offset) = config.expiration_offset {
418            self.compute_state.apply_expiration_offset(offset);
419        }
420
421        self.initialize_logging(config.logging);
422
423        self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
424    }
425
426    fn handle_update_configuration(&mut self, params: ComputeParameters) {
427        debug!("Applying configuration update: {params:?}");
428
429        let ComputeParameters {
430            workload_class,
431            max_result_size,
432            tracing,
433            grpc_client: _grpc_client,
434            dyncfg_updates,
435        } = params;
436
437        if let Some(v) = workload_class {
438            self.compute_state.metrics.set_workload_class(v);
439        }
440        if let Some(v) = max_result_size {
441            self.compute_state.max_result_size = v;
442        }
443
444        tracing.apply(self.compute_state.tracing_handle.as_ref());
445
446        dyncfg_updates.apply(&self.compute_state.worker_config);
447        self.compute_state
448            .persist_clients
449            .cfg()
450            .apply_from(&dyncfg_updates);
451
452        // Note: We're only updating mz_metrics from the compute state here, but not from the
453        // equivalent storage state. This is because they're running on the same process and
454        // share the metrics.
455        mz_metrics::update_dyncfg(&dyncfg_updates);
456
457        self.compute_state.apply_worker_config();
458    }
459
460    fn handle_create_dataflow(
461        &mut self,
462        dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
463    ) {
464        let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
465        let as_of = dataflow.as_of.clone().unwrap();
466
467        let dataflow_expiration = dataflow
468            .time_dependence
469            .as_ref()
470            .map(|time_dependence| {
471                self.determine_dataflow_expiration(time_dependence, &dataflow.until)
472            })
473            .unwrap_or_default();
474
475        // Add the dataflow expiration to `until`.
476        let until = dataflow.until.meet(&dataflow_expiration);
477
478        if dataflow.is_transient() {
479            debug!(
480                name = %dataflow.debug_name,
481                import_ids = %dataflow.display_import_ids(),
482                export_ids = %dataflow.display_export_ids(),
483                as_of = ?as_of.elements(),
484                time_dependence = ?dataflow.time_dependence,
485                expiration = ?dataflow_expiration.elements(),
486                expiration_datetime = ?dataflow_expiration
487                    .as_option()
488                    .map(|t| mz_ore::now::to_datetime(t.into())),
489                plan_until = ?dataflow.until.elements(),
490                until = ?until.elements(),
491                "creating dataflow",
492            );
493        } else {
494            info!(
495                name = %dataflow.debug_name,
496                import_ids = %dataflow.display_import_ids(),
497                export_ids = %dataflow.display_export_ids(),
498                as_of = ?as_of.elements(),
499                time_dependence = ?dataflow.time_dependence,
500                expiration = ?dataflow_expiration.elements(),
501                expiration_datetime = ?dataflow_expiration
502                    .as_option()
503                    .map(|t| mz_ore::now::to_datetime(t.into())),
504                plan_until = ?dataflow.until.elements(),
505                until = ?until.elements(),
506                "creating dataflow",
507            );
508        };
509
510        let subscribe_copy_ids: BTreeSet<_> = dataflow
511            .subscribe_ids()
512            .chain(dataflow.copy_to_ids())
513            .collect();
514
515        // Initialize compute and logging state for each object.
516        for object_id in dataflow.export_ids() {
517            let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
518            let metrics = self.compute_state.metrics.for_collection(object_id);
519            let mut collection = CollectionState::new(
520                Rc::clone(&dataflow_index),
521                is_subscribe_or_copy,
522                as_of.clone(),
523                metrics,
524            );
525
526            if let Some(logger) = self.compute_state.compute_logger.clone() {
527                let logging = CollectionLogging::new(
528                    object_id,
529                    logger,
530                    *dataflow_index,
531                    dataflow.import_ids(),
532                );
533                collection.logging = Some(logging);
534            }
535
536            collection.reset_reported_frontiers(ReportedFrontier::NotReported {
537                lower: as_of.clone(),
538            });
539
540            let existing = self.compute_state.collections.insert(object_id, collection);
541            if existing.is_some() {
542                error!(
543                    id = ?object_id,
544                    "existing collection for newly created dataflow",
545                );
546            }
547        }
548
549        let (start_signal, suspension_token) = StartSignal::new();
550        for id in dataflow.export_ids() {
551            self.compute_state
552                .suspended_collections
553                .insert(id, Rc::clone(&suspension_token));
554        }
555
556        crate::render::build_compute_dataflow(
557            self.timely_worker,
558            self.compute_state,
559            dataflow,
560            start_signal,
561            until,
562            dataflow_expiration,
563        );
564    }
565
566    fn handle_schedule(&mut self, id: GlobalId) {
567        // A `Schedule` command instructs us to begin dataflow computation for a collection, so
568        // we should unsuspend it by dropping the corresponding suspension token. Note that a
569        // dataflow can export multiple collections and they all share one suspension token, so the
570        // computation of a dataflow will only start once all its exported collections have been
571        // scheduled.
572        let suspension_token = self.compute_state.suspended_collections.remove(&id);
573        drop(suspension_token);
574    }
575
576    fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
577        if frontier.is_empty() {
578            // Indicates that we may drop `id`, as there are no more valid times to read.
579            self.drop_collection(id);
580        } else {
581            self.compute_state
582                .traces
583                .allow_compaction(id, frontier.borrow());
584        }
585    }
586
587    #[mz_ore::instrument(level = "debug")]
588    fn handle_peek(&mut self, peek: Peek) {
589        let pending = match &peek.target {
590            PeekTarget::Index { id } => {
591                // Acquire a copy of the trace suitable for fulfilling the peek.
592                let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
593                PendingPeek::index(peek, trace_bundle)
594            }
595            PeekTarget::Persist { metadata, .. } => {
596                let metadata = metadata.clone();
597                PendingPeek::persist(
598                    peek,
599                    Arc::clone(&self.compute_state.persist_clients),
600                    metadata,
601                    usize::cast_from(self.compute_state.max_result_size),
602                    self.timely_worker,
603                )
604            }
605        };
606
607        // Log the receipt of the peek.
608        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
609            logger.log(&pending.as_log_event(true));
610        }
611
612        self.process_peek(&mut Antichain::new(), pending);
613    }
614
615    fn handle_cancel_peek(&mut self, uuid: Uuid) {
616        if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
617            self.send_peek_response(peek, PeekResponse::Canceled);
618        }
619    }
620
621    fn handle_allow_writes(&mut self, id: GlobalId) {
622        // Enable persist compaction on any allow-writes command. We
623        // assume persist only compacts after making durable changes,
624        // such as appending a batch or advancing the upper.
625        self.compute_state.persist_clients.cfg().enable_compaction();
626
627        if let Some(collection) = self.compute_state.collections.get_mut(&id) {
628            collection.allow_writes();
629        } else {
630            soft_panic_or_log!("allow writes for unknown collection {id}");
631        }
632    }
633
634    /// Drop the given collection.
635    fn drop_collection(&mut self, id: GlobalId) {
636        let collection = self
637            .compute_state
638            .collections
639            .remove(&id)
640            .expect("dropped untracked collection");
641
642        // If this collection is an index, remove its trace.
643        self.compute_state.traces.remove(&id);
644        // If the collection is unscheduled, remove it from the list of waiting collections.
645        self.compute_state.suspended_collections.remove(&id);
646
647        // Drop the dataflow, if all its exports have been dropped.
648        if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
649            self.timely_worker.drop_dataflow(index);
650        }
651
652        // The compute protocol requires us to send a `Frontiers` response with empty frontiers
653        // when a collection was dropped, unless:
654        //  * The frontier was already reported as empty previously, or
655        //  * The collection is a subscribe or copy-to.
656        if !collection.is_subscribe_or_copy {
657            let reported = collection.reported_frontiers;
658            let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
659            let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
660            let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
661
662            let frontiers = FrontiersResponse {
663                write_frontier,
664                input_frontier,
665                output_frontier,
666            };
667            if frontiers.has_updates() {
668                self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
669            }
670        }
671    }
672
673    /// Initializes timely dataflow logging and publishes as a view.
674    pub fn initialize_logging(&mut self, config: LoggingConfig) {
675        if self.compute_state.compute_logger.is_some() {
676            panic!("dataflow server has already initialized logging");
677        }
678
679        let LoggingTraces {
680            traces,
681            dataflow_index,
682            compute_logger: logger,
683        } = logging::initialize(
684            self.timely_worker,
685            &config,
686            self.compute_state.metrics_registry.clone(),
687            Rc::clone(&self.compute_state.worker_config),
688            self.compute_state.workers_per_process,
689        );
690
691        let dataflow_index = Rc::new(dataflow_index);
692        let mut log_index_ids = config.index_logs;
693        for (log, trace) in traces {
694            // Install trace as maintained index.
695            let id = log_index_ids
696                .remove(&log)
697                .expect("`logging::initialize` does not invent logs");
698            self.compute_state.traces.set(id, trace);
699
700            // Initialize compute and logging state for the logging index.
701            let is_subscribe_or_copy = false;
702            let as_of = Antichain::from_elem(Timestamp::MIN);
703            let metrics = self.compute_state.metrics.for_collection(id);
704            let mut collection = CollectionState::new(
705                Rc::clone(&dataflow_index),
706                is_subscribe_or_copy,
707                as_of,
708                metrics,
709            );
710
711            let logging =
712                CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
713            collection.logging = Some(logging);
714
715            let existing = self.compute_state.collections.insert(id, collection);
716            if existing.is_some() {
717                error!(
718                    id = ?id,
719                    "existing collection for newly initialized logging export",
720                );
721            }
722        }
723
724        // Sanity check.
725        assert!(
726            log_index_ids.is_empty(),
727            "failed to create requested logging indexes: {log_index_ids:?}",
728        );
729
730        self.compute_state.compute_logger = Some(logger);
731    }
732
733    /// Send progress information to the controller.
734    pub fn report_frontiers(&mut self) {
735        let mut responses = Vec::new();
736
737        // Maintain a single allocation for `new_frontier` to avoid allocating on every iteration.
738        let mut new_frontier = Antichain::new();
739
740        for (&id, collection) in self.compute_state.collections.iter_mut() {
741            // The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
742            // collections (database-issues#4701).
743            if collection.is_subscribe_or_copy {
744                continue;
745            }
746
747            let reported = collection.reported_frontiers();
748
749            // Collect the write frontier and check for progress.
750            new_frontier.clear();
751            if let Some(traces) = self.compute_state.traces.get_mut(&id) {
752                assert!(
753                    collection.sink_write_frontier.is_none(),
754                    "collection {id} has multiple frontiers"
755                );
756                traces.oks_mut().read_upper(&mut new_frontier);
757            } else if let Some(frontier) = &collection.sink_write_frontier {
758                new_frontier.clone_from(&frontier.borrow());
759            } else {
760                error!(id = ?id, "collection without write frontier");
761                continue;
762            }
763            let new_write_frontier = reported
764                .write_frontier
765                .allows_reporting(&new_frontier)
766                .then(|| new_frontier.clone());
767
768            // Collect the output frontier and check for progress.
769            //
770            // By default, the output frontier equals the write frontier (which is still stored in
771            // `new_frontier`). If the collection provides a compute frontier, we construct the
772            // output frontier by taking the meet of write and compute frontier, to avoid:
773            //  * reporting progress through times we have not yet written
774            //  * reporting progress through times we have not yet fully processed, for
775            //    collections that jump their write frontiers into the future
776            //
777            // As a special case, in read-only mode we don't take the write frontier into account.
778            // The dataflow doesn't have the ability to push it forward, so it can't be used as a
779            // measure of dataflow progress.
780            if let Some(probe) = &collection.compute_probe {
781                if *collection.read_only_rx.borrow() {
782                    new_frontier.clear();
783                }
784                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
785            }
786            let new_output_frontier = reported
787                .output_frontier
788                .allows_reporting(&new_frontier)
789                .then(|| new_frontier.clone());
790
791            // Collect the input frontier and check for progress.
792            new_frontier.clear();
793            for probe in collection.input_probes.values() {
794                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
795            }
796            let new_input_frontier = reported
797                .input_frontier
798                .allows_reporting(&new_frontier)
799                .then(|| new_frontier.clone());
800
801            if let Some(frontier) = &new_write_frontier {
802                collection
803                    .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
804            }
805            if let Some(frontier) = &new_input_frontier {
806                collection
807                    .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
808            }
809            if let Some(frontier) = &new_output_frontier {
810                collection
811                    .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
812            }
813
814            let response = FrontiersResponse {
815                write_frontier: new_write_frontier,
816                input_frontier: new_input_frontier,
817                output_frontier: new_output_frontier,
818            };
819            if response.has_updates() {
820                responses.push((id, response));
821            }
822        }
823
824        for (id, frontiers) in responses {
825            self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
826        }
827    }
828
829    /// Report per-worker metrics.
830    pub(crate) fn report_metrics(&self) {
831        if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
832            let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
833            let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
834            let remaining = expiration - now;
835            self.compute_state
836                .metrics
837                .replica_expiration_remaining_seconds
838                .set(remaining)
839        }
840    }
841
842    /// Either complete the peek (and send the response) or put it in the pending set.
843    fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
844        let response = match &mut peek {
845            PendingPeek::Index(peek) => {
846                let start = Instant::now();
847
848                let peek_stash_eligible = peek
849                    .peek
850                    .finishing
851                    .is_streamable(peek.peek.result_desc.arity());
852
853                let peek_stash_enabled = {
854                    let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
855                    let peek_persist_stash_available =
856                        self.compute_state.peek_stash_persist_location.is_some();
857                    if !peek_persist_stash_available && enabled {
858                        error!("missing peek_stash_persist_location but peek stash is enabled");
859                    }
860                    enabled && peek_persist_stash_available
861                };
862
863                let peek_stash_threshold_bytes =
864                    PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
865
866                let metrics = IndexPeekMetrics {
867                    seek_fulfillment_seconds: &self
868                        .compute_state
869                        .metrics
870                        .index_peek_seek_fulfillment_seconds,
871                    frontier_check_seconds: &self
872                        .compute_state
873                        .metrics
874                        .index_peek_frontier_check_seconds,
875                    error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
876                    cursor_setup_seconds: &self
877                        .compute_state
878                        .metrics
879                        .index_peek_cursor_setup_seconds,
880                    row_iteration_seconds: &self
881                        .compute_state
882                        .metrics
883                        .index_peek_row_iteration_seconds,
884                    result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
885                    row_collection_seconds: &self
886                        .compute_state
887                        .metrics
888                        .index_peek_row_collection_seconds,
889                };
890
891                let status = peek.seek_fulfillment(
892                    upper,
893                    self.compute_state.max_result_size,
894                    peek_stash_enabled && peek_stash_eligible,
895                    peek_stash_threshold_bytes,
896                    &metrics,
897                );
898
899                self.compute_state
900                    .metrics
901                    .index_peek_total_seconds
902                    .observe(start.elapsed().as_secs_f64());
903
904                match status {
905                    PeekStatus::Ready(result) => Some(result),
906                    PeekStatus::NotReady => None,
907                    PeekStatus::UsePeekStash => {
908                        let _span =
909                            span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
910
911                        let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
912                            .get(&self.compute_state.worker_config);
913
914                        let stash_task = peek_stash::StashingPeek::start_upload(
915                            Arc::clone(&self.compute_state.persist_clients),
916                            self.compute_state
917                                .peek_stash_persist_location
918                                .as_ref()
919                                .expect("verified above"),
920                            peek.peek.clone(),
921                            peek.trace_bundle.clone(),
922                            peek_stash_batch_max_runs,
923                        );
924
925                        self.compute_state
926                            .pending_peeks
927                            .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
928                        return;
929                    }
930                }
931            }
932            PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
933                self.compute_state
934                    .metrics
935                    .persist_peek_seconds
936                    .observe(duration.as_secs_f64());
937                result
938            }),
939            PendingPeek::Stash(stashing_peek) => {
940                let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
941                let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
942                stashing_peek.pump_rows(num_batches, batch_size);
943
944                if let Ok((response, duration)) = stashing_peek.result.try_recv() {
945                    self.compute_state
946                        .metrics
947                        .stashed_peek_seconds
948                        .observe(duration.as_secs_f64());
949                    trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
950
951                    Some(response)
952                } else {
953                    None
954                }
955            }
956        };
957
958        if let Some(response) = response {
959            let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
960            self.send_peek_response(peek, response)
961        } else {
962            let uuid = peek.peek().uuid;
963            self.compute_state.pending_peeks.insert(uuid, peek);
964        }
965    }
966
967    /// Scan pending peeks and attempt to retire each.
968    pub fn process_peeks(&mut self) {
969        let mut upper = Antichain::new();
970        let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
971        for (_uuid, peek) in pending_peeks {
972            self.process_peek(&mut upper, peek);
973        }
974    }
975
976    /// Sends a response for this peek's resolution to the coordinator.
977    ///
978    /// Note that this function takes ownership of the `PendingPeek`, which is
979    /// meant to prevent multiple responses to the same peek.
980    #[mz_ore::instrument(level = "debug")]
981    fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
982        let log_event = peek.as_log_event(false);
983        // Respond with the response.
984        self.send_compute_response(ComputeResponse::PeekResponse(
985            peek.peek().uuid,
986            response,
987            OpenTelemetryContext::obtain(),
988        ));
989
990        // Log responding to the peek request.
991        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
992            logger.log(&log_event);
993        }
994    }
995
996    /// Scan the shared subscribe response buffer, and forward results along.
997    pub fn process_subscribes(&mut self) {
998        let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
999        for (sink_id, mut response) in subscribe_responses.drain(..) {
1000            // Update frontier logging for this subscribe.
1001            if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1002                let new_frontier = match &response {
1003                    SubscribeResponse::Batch(b) => b.upper.clone(),
1004                    SubscribeResponse::DroppedAt(_) => Antichain::new(),
1005                };
1006
1007                let reported = collection.reported_frontiers();
1008                assert!(
1009                    reported.write_frontier.allows_reporting(&new_frontier),
1010                    "subscribe write frontier regression: {:?} -> {:?}",
1011                    reported.write_frontier,
1012                    new_frontier,
1013                );
1014                assert!(
1015                    reported.input_frontier.allows_reporting(&new_frontier),
1016                    "subscribe input frontier regression: {:?} -> {:?}",
1017                    reported.input_frontier,
1018                    new_frontier,
1019                );
1020
1021                collection
1022                    .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1023                collection
1024                    .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1025                collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1026            } else {
1027                // Presumably tracking state for this subscribe was already dropped by
1028                // `drop_collection`. There is nothing left to do for logging.
1029            }
1030
1031            response
1032                .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1033            self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1034        }
1035    }
1036
1037    /// Scan the shared copy to response buffer, and forward results along.
1038    pub fn process_copy_tos(&self) {
1039        let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1040        for (sink_id, response) in responses.drain(..) {
1041            self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1042        }
1043    }
1044
1045    /// Send a response to the coordinator.
1046    fn send_compute_response(&self, response: ComputeResponse) {
1047        // Ignore send errors because the coordinator is free to ignore our
1048        // responses. This happens during shutdown.
1049        let _ = self.response_tx.send(response);
1050    }
1051
1052    /// Checks for dataflow expiration. Panics if we're past the replica expiration time.
1053    pub(crate) fn check_expiration(&self) {
1054        let now = mz_ore::now::SYSTEM_TIME();
1055        if self.compute_state.replica_expiration.less_than(&now.into()) {
1056            let now_datetime = mz_ore::now::to_datetime(now);
1057            let expiration_datetime = self
1058                .compute_state
1059                .replica_expiration
1060                .as_option()
1061                .map(Into::into)
1062                .map(mz_ore::now::to_datetime);
1063
1064            // We error and assert separately to produce structured logs in anything that depends
1065            // on tracing.
1066            error!(
1067                now,
1068                now_datetime = ?now_datetime,
1069                expiration = ?self.compute_state.replica_expiration.elements(),
1070                expiration_datetime = ?expiration_datetime,
1071                "replica expired"
1072            );
1073
1074            // Repeat condition for better error message.
1075            assert!(
1076                !self.compute_state.replica_expiration.less_than(&now.into()),
1077                "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1078                self.compute_state.replica_expiration.elements(),
1079            );
1080        }
1081    }
1082
1083    /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be
1084    /// dropped.
1085    ///
1086    /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under
1087    /// which dataflow expiration should be disabled.
1088    pub fn determine_dataflow_expiration(
1089        &self,
1090        time_dependence: &TimeDependence,
1091        until: &Antichain<mz_repr::Timestamp>,
1092    ) -> Antichain<mz_repr::Timestamp> {
1093        // Evaluate time dependence with respect to the expiration time.
1094        // * Step time forward to ensure the expiration time is different to the moment a dataflow
1095        //   can legitimately jump to.
1096        // * We cannot expire dataflow with an until that is less or equal to the expiration time.
1097        let iter = self
1098            .compute_state
1099            .replica_expiration
1100            .iter()
1101            .filter_map(|t| time_dependence.apply(*t))
1102            .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1103            .filter(|expiration| !until.less_equal(expiration));
1104        Antichain::from_iter(iter)
1105    }
1106}
1107
1108/// A peek against either an index or a Persist collection.
1109///
1110/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1111/// as each `PendingPeek` is meant to be dropped after it's responded to.
1112pub enum PendingPeek {
1113    /// A peek against an index. (Possibly a temporary index created for the purpose.)
1114    Index(IndexPeek),
1115    /// A peek against a Persist-backed collection.
1116    Persist(PersistPeek),
1117    /// A peek against an index that is being stashed in the peek stash by an
1118    /// async background task.
1119    Stash(peek_stash::StashingPeek),
1120}
1121
1122impl PendingPeek {
1123    /// Produces a corresponding log event.
1124    pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1125        let peek = self.peek();
1126        let (id, peek_type) = match &peek.target {
1127            PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1128            PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1129        };
1130        let uuid = peek.uuid.into_bytes();
1131        ComputeEvent::Peek(PeekEvent {
1132            id,
1133            time: peek.timestamp,
1134            uuid,
1135            peek_type,
1136            installed,
1137        })
1138    }
1139
1140    fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1141        let empty_frontier = Antichain::new();
1142        let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1143        trace_bundle
1144            .oks_mut()
1145            .set_logical_compaction(timestamp_frontier.borrow());
1146        trace_bundle
1147            .errs_mut()
1148            .set_logical_compaction(timestamp_frontier.borrow());
1149        trace_bundle
1150            .oks_mut()
1151            .set_physical_compaction(empty_frontier.borrow());
1152        trace_bundle
1153            .errs_mut()
1154            .set_physical_compaction(empty_frontier.borrow());
1155
1156        PendingPeek::Index(IndexPeek {
1157            peek,
1158            trace_bundle,
1159            span: tracing::Span::current(),
1160        })
1161    }
1162
1163    fn persist<A: Allocate>(
1164        peek: Peek,
1165        persist_clients: Arc<PersistClientCache>,
1166        metadata: CollectionMetadata,
1167        max_result_size: usize,
1168        timely_worker: &TimelyWorker<A>,
1169    ) -> Self {
1170        let active_worker = {
1171            // Choose the worker that does the actual peek arbitrarily but consistently.
1172            let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1173            chosen_index == timely_worker.index()
1174        };
1175        let activator = timely_worker.sync_activator_for([].into());
1176        let peek_uuid = peek.uuid;
1177
1178        let (result_tx, result_rx) = oneshot::channel();
1179        let timestamp = peek.timestamp;
1180        let mfp_plan = peek.map_filter_project.clone();
1181        let max_results_needed = peek
1182            .finishing
1183            .limit
1184            .map(|l| usize::cast_from(u64::from(l)))
1185            .unwrap_or(usize::MAX)
1186            + peek.finishing.offset;
1187        let order_by = peek.finishing.order_by.clone();
1188
1189        // Persist peeks can include at most one literal constraint.
1190        let literal_constraint = peek
1191            .literal_constraints
1192            .clone()
1193            .map(|rows| rows.into_element());
1194
1195        let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1196            let start = Instant::now();
1197            let result = if active_worker {
1198                PersistPeek::do_peek(
1199                    &persist_clients,
1200                    metadata,
1201                    timestamp,
1202                    literal_constraint,
1203                    mfp_plan,
1204                    max_result_size,
1205                    max_results_needed,
1206                )
1207                .await
1208            } else {
1209                Ok(vec![])
1210            };
1211            let result = match result {
1212                Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1213                Err(e) => PeekResponse::Error(e.to_string()),
1214            };
1215            match result_tx.send((result, start.elapsed())) {
1216                Ok(()) => {}
1217                Err((_result, elapsed)) => {
1218                    debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1219                }
1220            }
1221            match activator.activate() {
1222                Ok(()) => {}
1223                Err(_) => {
1224                    debug!("unable to wake timely after completed peek {peek_uuid}");
1225                }
1226            }
1227        });
1228        PendingPeek::Persist(PersistPeek {
1229            peek,
1230            _abort_handle: task_handle.abort_on_drop(),
1231            result: result_rx,
1232            span: tracing::Span::current(),
1233        })
1234    }
1235
1236    fn span(&self) -> &tracing::Span {
1237        match self {
1238            PendingPeek::Index(p) => &p.span,
1239            PendingPeek::Persist(p) => &p.span,
1240            PendingPeek::Stash(p) => &p.span,
1241        }
1242    }
1243
1244    pub(crate) fn peek(&self) -> &Peek {
1245        match self {
1246            PendingPeek::Index(p) => &p.peek,
1247            PendingPeek::Persist(p) => &p.peek,
1248            PendingPeek::Stash(p) => &p.peek,
1249        }
1250    }
1251}
1252
1253/// An in-progress Persist peek.
1254///
1255/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1256/// as each `PendingPeek` is meant to be dropped after it's responded to.
1257pub struct PersistPeek {
1258    pub(crate) peek: Peek,
1259    /// A background task that's responsible for producing the peek results.
1260    /// If we're no longer interested in the results, we abort the task.
1261    _abort_handle: AbortOnDropHandle<()>,
1262    /// The result of the background task, eventually.
1263    result: oneshot::Receiver<(PeekResponse, Duration)>,
1264    /// The `tracing::Span` tracking this peek's operation
1265    span: tracing::Span,
1266}
1267
1268impl PersistPeek {
1269    async fn do_peek(
1270        persist_clients: &PersistClientCache,
1271        metadata: CollectionMetadata,
1272        as_of: Timestamp,
1273        literal_constraint: Option<Row>,
1274        mfp_plan: SafeMfpPlan,
1275        max_result_size: usize,
1276        mut limit_remaining: usize,
1277    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1278        let client = persist_clients
1279            .open(metadata.persist_location)
1280            .await
1281            .map_err(|e| e.to_string())?;
1282
1283        let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1284            .open_leased_reader(
1285                metadata.data_shard,
1286                Arc::new(metadata.relation_desc.clone()),
1287                Arc::new(UnitSchema),
1288                Diagnostics::from_purpose("persist::peek"),
1289                USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1290            )
1291            .await
1292            .map_err(|e| e.to_string())?;
1293
1294        // If we are using txn-wal for this collection, then the upper might
1295        // be advanced lazily and we have to go through txn-wal for reads.
1296        //
1297        // TODO: If/when we have a process-wide TxnsRead worker for clusterd,
1298        // use in here (instead of opening a new TxnsCache) to save a persist
1299        // reader registration and some txns shard read traffic.
1300        let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1301            Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1302        } else {
1303            None
1304        };
1305
1306        let metrics = client.metrics();
1307
1308        let mut cursor = StatsCursor::new(
1309            &mut reader,
1310            txns_read.as_mut(),
1311            metrics,
1312            &mfp_plan,
1313            &metadata.relation_desc,
1314            Antichain::from_elem(as_of),
1315        )
1316        .await
1317        .map_err(|since| {
1318            format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1319        })?;
1320
1321        // Re-used state for processing and building rows.
1322        let mut result = vec![];
1323        let mut datum_vec = DatumVec::new();
1324        let mut row_builder = Row::default();
1325        let arena = RowArena::new();
1326        let mut total_size = 0usize;
1327
1328        let literal_len = match &literal_constraint {
1329            None => 0,
1330            Some(row) => row.iter().count(),
1331        };
1332
1333        'collect: while limit_remaining > 0 {
1334            let Some(batch) = cursor.next().await else {
1335                break;
1336            };
1337            for (data, _, d) in batch {
1338                let row = data.map_err(|e| e.to_string())?;
1339
1340                if let Some(literal) = &literal_constraint {
1341                    match row.iter().take(literal_len).cmp(literal.iter()) {
1342                        Ordering::Less => continue,
1343                        Ordering::Equal => {}
1344                        Ordering::Greater => break 'collect,
1345                    }
1346                }
1347
1348                let count: usize = d.try_into().map_err(|_| {
1349                    error!(
1350                        shard = %metadata.data_shard, diff = d, ?row,
1351                        "persist peek encountered negative multiplicities",
1352                    );
1353                    format!(
1354                        "Invalid data in source, \
1355                         saw retractions ({}) for row that does not exist: {:?}",
1356                        -d, row,
1357                    )
1358                })?;
1359                let Some(count) = NonZeroUsize::new(count) else {
1360                    continue;
1361                };
1362                let mut datum_local = datum_vec.borrow_with(&row);
1363                let eval_result = mfp_plan
1364                    .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1365                    .map(|row| row.cloned())
1366                    .map_err(|e| e.to_string())?;
1367                if let Some(row) = eval_result {
1368                    total_size = total_size
1369                        .saturating_add(row.byte_len())
1370                        .saturating_add(std::mem::size_of::<NonZeroUsize>());
1371                    if total_size > max_result_size {
1372                        return Err(format!(
1373                            "result exceeds max size of {}",
1374                            ByteSize::b(u64::cast_from(max_result_size))
1375                        ));
1376                    }
1377                    result.push((row, count));
1378                    limit_remaining = limit_remaining.saturating_sub(count.get());
1379                    if limit_remaining == 0 {
1380                        break;
1381                    }
1382                }
1383            }
1384        }
1385
1386        Ok(result)
1387    }
1388}
1389
1390/// An in-progress index-backed peek, and data to eventually fulfill it.
1391pub struct IndexPeek {
1392    peek: Peek,
1393    /// The data from which the trace derives.
1394    trace_bundle: TraceBundle,
1395    /// The `tracing::Span` tracking this peek's operation
1396    span: tracing::Span,
1397}
1398
1399/// Histogram metrics for index peek phases.
1400///
1401/// This struct bundles references to the various histogram metrics used to
1402/// instrument the index peek processing pipeline.
1403pub(crate) struct IndexPeekMetrics<'a> {
1404    pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1405    pub frontier_check_seconds: &'a prometheus::Histogram,
1406    pub error_scan_seconds: &'a prometheus::Histogram,
1407    pub cursor_setup_seconds: &'a prometheus::Histogram,
1408    pub row_iteration_seconds: &'a prometheus::Histogram,
1409    pub result_sort_seconds: &'a prometheus::Histogram,
1410    pub row_collection_seconds: &'a prometheus::Histogram,
1411}
1412
1413impl IndexPeek {
1414    /// Attempts to fulfill the peek and reports success.
1415    ///
1416    /// To produce output at `peek.timestamp`, we must be certain that
1417    /// it is no longer changing. A trace guarantees that all future
1418    /// changes will be greater than or equal to an element of `upper`.
1419    ///
1420    /// If an element of `upper` is less or equal to `peek.timestamp`,
1421    /// then there can be further updates that would change the output.
1422    /// If no element of `upper` is less or equal to `peek.timestamp`,
1423    /// then for any time `t` less or equal to `peek.timestamp` it is
1424    /// not the case that `upper` is less or equal to that timestamp,
1425    /// and so the result cannot further evolve.
1426    fn seek_fulfillment(
1427        &mut self,
1428        upper: &mut Antichain<Timestamp>,
1429        max_result_size: u64,
1430        peek_stash_eligible: bool,
1431        peek_stash_threshold_bytes: usize,
1432        metrics: &IndexPeekMetrics<'_>,
1433    ) -> PeekStatus {
1434        let method_start = Instant::now();
1435
1436        self.trace_bundle.oks_mut().read_upper(upper);
1437        if upper.less_equal(&self.peek.timestamp) {
1438            return PeekStatus::NotReady;
1439        }
1440        self.trace_bundle.errs_mut().read_upper(upper);
1441        if upper.less_equal(&self.peek.timestamp) {
1442            return PeekStatus::NotReady;
1443        }
1444
1445        let read_frontier = self.trace_bundle.compaction_frontier();
1446        if !read_frontier.less_equal(&self.peek.timestamp) {
1447            let error = format!(
1448                "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1449                read_frontier.elements(),
1450                self.peek.timestamp,
1451            );
1452            return PeekStatus::Ready(PeekResponse::Error(error));
1453        }
1454
1455        metrics
1456            .frontier_check_seconds
1457            .observe(method_start.elapsed().as_secs_f64());
1458
1459        let result = self.collect_finished_data(
1460            max_result_size,
1461            peek_stash_eligible,
1462            peek_stash_threshold_bytes,
1463            metrics,
1464        );
1465
1466        metrics
1467            .seek_fulfillment_seconds
1468            .observe(method_start.elapsed().as_secs_f64());
1469
1470        result
1471    }
1472
1473    /// Collects data for a known-complete peek from the ok stream.
1474    fn collect_finished_data(
1475        &mut self,
1476        max_result_size: u64,
1477        peek_stash_eligible: bool,
1478        peek_stash_threshold_bytes: usize,
1479        metrics: &IndexPeekMetrics<'_>,
1480    ) -> PeekStatus {
1481        let error_scan_start = Instant::now();
1482
1483        // Check if there exist any errors and, if so, return whatever one we
1484        // find first.
1485        let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1486        while cursor.key_valid(&storage) {
1487            let mut copies = Diff::ZERO;
1488            cursor.map_times(&storage, |time, diff| {
1489                if time.less_equal(&self.peek.timestamp) {
1490                    copies += diff;
1491                }
1492            });
1493            if copies.is_negative() {
1494                let error = cursor.key(&storage);
1495                error!(
1496                    target = %self.peek.target.id(), diff = %copies, %error,
1497                    "index peek encountered negative multiplicities in error trace",
1498                );
1499                return PeekStatus::Ready(PeekResponse::Error(format!(
1500                    "Invalid data in source errors, \
1501                    saw retractions ({}) for row that does not exist: {}",
1502                    -copies, error,
1503                )));
1504            }
1505            if copies.is_positive() {
1506                return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1507            }
1508            cursor.step_key(&storage);
1509        }
1510
1511        metrics
1512            .error_scan_seconds
1513            .observe(error_scan_start.elapsed().as_secs_f64());
1514
1515        Self::collect_ok_finished_data(
1516            &self.peek,
1517            self.trace_bundle.oks_mut(),
1518            max_result_size,
1519            peek_stash_eligible,
1520            peek_stash_threshold_bytes,
1521            metrics,
1522        )
1523    }
1524
1525    /// Collects data for a known-complete peek from the ok stream.
1526    fn collect_ok_finished_data<Tr>(
1527        peek: &Peek<Timestamp>,
1528        oks_handle: &mut Tr,
1529        max_result_size: u64,
1530        peek_stash_eligible: bool,
1531        peek_stash_threshold_bytes: usize,
1532        metrics: &IndexPeekMetrics<'_>,
1533    ) -> PeekStatus
1534    where
1535        for<'a> Tr: TraceReader<
1536                Key<'a>: ToDatumIter + Eq,
1537                KeyOwn = Row,
1538                Val<'a>: ToDatumIter,
1539                TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1540                DiffGat<'a> = &'a Diff,
1541            >,
1542    {
1543        let max_result_size = usize::cast_from(max_result_size);
1544        let count_byte_size = size_of::<NonZeroUsize>();
1545
1546        // Cursor setup timing
1547        let cursor_setup_start = Instant::now();
1548
1549        // We clone `literal_constraints` here because we don't want to move the constraints
1550        // out of the peek struct, and don't want to modify in-place.
1551        let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1552            peek.target.id().clone(),
1553            peek.map_filter_project.clone(),
1554            peek.timestamp,
1555            peek.literal_constraints.clone().as_deref_mut(),
1556            oks_handle,
1557        );
1558
1559        metrics
1560            .cursor_setup_seconds
1561            .observe(cursor_setup_start.elapsed().as_secs_f64());
1562
1563        // Accumulated `Vec<(row, count)>` results that we are likely to return.
1564        let mut results = Vec::new();
1565        let mut total_size: usize = 0;
1566
1567        // When set, a bound on the number of records we need to return.
1568        // The requirements on the records are driven by the finishing's
1569        // `order_by` field. Further limiting will happen when the results
1570        // are collected, so we don't need to have exactly this many results,
1571        // just at least those results that would have been returned.
1572        let max_results = peek.finishing.num_rows_needed();
1573
1574        let comparator = RowComparator::new(peek.finishing.order_by.as_slice());
1575
1576        // Row iteration timing
1577        let row_iteration_start = Instant::now();
1578        let mut sort_time_accum = Duration::ZERO;
1579
1580        while let Some(row) = peek_iterator.next() {
1581            let row = match row {
1582                Ok(row) => row,
1583                Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1584            };
1585            let (row, copies) = row;
1586            let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1587
1588            total_size = total_size
1589                .saturating_add(row.byte_len())
1590                .saturating_add(count_byte_size);
1591            if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1592                return PeekStatus::UsePeekStash;
1593            }
1594            if total_size > max_result_size {
1595                return PeekStatus::Ready(PeekResponse::Error(format!(
1596                    "result exceeds max size of {}",
1597                    ByteSize::b(u64::cast_from(max_result_size))
1598                )));
1599            }
1600
1601            results.push((row, copies));
1602
1603            // If we hold many more than `max_results` records, we can thin down
1604            // `results` using `self.finishing.ordering`.
1605            if let Some(max_results) = max_results {
1606                // We use a threshold twice what we intend, to amortize the work
1607                // across all of the insertions. We could tighten this, but it
1608                // works for the moment.
1609                if results.len() >= 2 * max_results {
1610                    if peek.finishing.order_by.is_empty() {
1611                        results.truncate(max_results);
1612                        metrics
1613                            .row_iteration_seconds
1614                            .observe(row_iteration_start.elapsed().as_secs_f64());
1615                        metrics
1616                            .result_sort_seconds
1617                            .observe(sort_time_accum.as_secs_f64());
1618                        let row_collection_start = Instant::now();
1619                        let collection = RowCollection::new(results, &peek.finishing.order_by);
1620                        metrics
1621                            .row_collection_seconds
1622                            .observe(row_collection_start.elapsed().as_secs_f64());
1623                        return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1624                    } else {
1625                        // We can sort `results` and then truncate to `max_results`.
1626                        // This has an effect similar to a priority queue, without
1627                        // its interactive dequeueing properties.
1628                        // TODO: Had we left these as `Vec<Datum>` we would avoid
1629                        // the unpacking; we should consider doing that, although
1630                        // it will require a re-pivot of the code to branch on this
1631                        // inner test (as we prefer not to maintain `Vec<Datum>`
1632                        // in the other case).
1633                        let sort_start = Instant::now();
1634                        results.sort_by(|left, right| {
1635                            comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0))
1636                        });
1637                        sort_time_accum += sort_start.elapsed();
1638                        let dropped = results.drain(max_results..);
1639                        let dropped_size =
1640                            dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1641                                acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1642                            });
1643                        total_size = total_size.saturating_sub(dropped_size);
1644                    }
1645                }
1646            }
1647        }
1648
1649        metrics
1650            .row_iteration_seconds
1651            .observe(row_iteration_start.elapsed().as_secs_f64());
1652        metrics
1653            .result_sort_seconds
1654            .observe(sort_time_accum.as_secs_f64());
1655
1656        let row_collection_start = Instant::now();
1657        let collection = RowCollection::new(results, &peek.finishing.order_by);
1658        metrics
1659            .row_collection_seconds
1660            .observe(row_collection_start.elapsed().as_secs_f64());
1661        PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1662    }
1663}
1664
1665/// For keeping track of the state of pending or ready peeks, and managing
1666/// control flow.
1667enum PeekStatus {
1668    /// The frontiers of objects are not yet advanced enough, peek is still
1669    /// pending.
1670    NotReady,
1671    /// The result size is above the configured threshold and the peek is
1672    /// eligible for using the peek result stash.
1673    UsePeekStash,
1674    /// The peek result is ready.
1675    Ready(PeekResponse),
1676}
1677
1678/// The frontiers we have reported to the controller for a collection.
1679#[derive(Debug)]
1680struct ReportedFrontiers {
1681    /// The reported write frontier.
1682    write_frontier: ReportedFrontier,
1683    /// The reported input frontier.
1684    input_frontier: ReportedFrontier,
1685    /// The reported output frontier.
1686    output_frontier: ReportedFrontier,
1687}
1688
1689impl ReportedFrontiers {
1690    /// Creates a new `ReportedFrontiers` instance.
1691    fn new() -> Self {
1692        Self {
1693            write_frontier: ReportedFrontier::new(),
1694            input_frontier: ReportedFrontier::new(),
1695            output_frontier: ReportedFrontier::new(),
1696        }
1697    }
1698}
1699
1700/// A frontier we have reported to the controller, or the least frontier we are allowed to report.
1701#[derive(Clone, Debug)]
1702pub enum ReportedFrontier {
1703    /// A frontier has been previously reported.
1704    Reported(Antichain<Timestamp>),
1705    /// No frontier has been reported yet.
1706    NotReported {
1707        /// A lower bound for frontiers that may be reported in the future.
1708        lower: Antichain<Timestamp>,
1709    },
1710}
1711
1712impl ReportedFrontier {
1713    /// Create a new `ReportedFrontier` enforcing the minimum lower bound.
1714    pub fn new() -> Self {
1715        let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1716        Self::NotReported { lower }
1717    }
1718
1719    /// Whether the reported frontier is the empty frontier.
1720    pub fn is_empty(&self) -> bool {
1721        match self {
1722            Self::Reported(frontier) => frontier.is_empty(),
1723            Self::NotReported { .. } => false,
1724        }
1725    }
1726
1727    /// Whether this `ReportedFrontier` allows reporting the given frontier.
1728    ///
1729    /// A `ReportedFrontier` allows reporting of another frontier if:
1730    ///  * The other frontier is greater than the reported frontier.
1731    ///  * The other frontier is greater than or equal to the lower bound.
1732    fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1733        match self {
1734            Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1735            Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1736        }
1737    }
1738}
1739
1740/// State maintained for a compute collection.
1741pub struct CollectionState {
1742    /// Tracks the frontiers that have been reported to the controller.
1743    reported_frontiers: ReportedFrontiers,
1744    /// The index of the dataflow computing this collection.
1745    ///
1746    /// Used for dropping the dataflow when the collection is dropped.
1747    /// The Dataflow index is wrapped in an `Rc`s and can be shared between collections, to reflect
1748    /// the possibility that a single dataflow can export multiple collections.
1749    dataflow_index: Rc<usize>,
1750    /// Whether this collection is a subscribe or copy-to.
1751    ///
1752    /// The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
1753    /// collections, so we need to be able to recognize them. This is something we would like to
1754    /// change in the future (database-issues#4701).
1755    pub is_subscribe_or_copy: bool,
1756    /// The collection's initial as-of frontier.
1757    ///
1758    /// Used to determine hydration status.
1759    as_of: Antichain<Timestamp>,
1760
1761    /// A token that should be dropped when this collection is dropped to clean up associated
1762    /// sink state.
1763    ///
1764    /// Only `Some` if the collection is a sink.
1765    pub sink_token: Option<SinkToken>,
1766    /// Frontier of sink writes.
1767    ///
1768    /// Only `Some` if the collection is a sink and *not* a subscribe.
1769    pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1770    /// Frontier probes for every input to the collection.
1771    pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1772    /// A probe reporting the frontier of times through which all collection outputs have been
1773    /// computed (but not necessarily written).
1774    ///
1775    /// `None` for collections with compute frontiers equal to their write frontiers.
1776    pub compute_probe: Option<probe::Handle<Timestamp>>,
1777    /// Logging state maintained for this collection.
1778    logging: Option<CollectionLogging>,
1779    /// Metrics tracked for this collection.
1780    metrics: CollectionMetrics,
1781    /// Send-side to transition a dataflow from read-only mode to read-write mode.
1782    ///
1783    /// All dataflows start in read-only mode. Only after receiving a
1784    /// `AllowWrites` command from the controller will they transition to
1785    /// read-write mode.
1786    ///
1787    /// A dataflow in read-only mode must not affect any external state.
1788    ///
1789    /// NOTE: In the future, we might want a more complicated flag, for example
1790    /// something that tells us after which timestamp we are allowed to write.
1791    /// In this first version we are keeping things as simple as possible!
1792    read_only_tx: watch::Sender<bool>,
1793    /// Receive-side to observe whether a dataflow is in read-only mode.
1794    pub read_only_rx: watch::Receiver<bool>,
1795}
1796
1797impl CollectionState {
1798    fn new(
1799        dataflow_index: Rc<usize>,
1800        is_subscribe_or_copy: bool,
1801        as_of: Antichain<Timestamp>,
1802        metrics: CollectionMetrics,
1803    ) -> Self {
1804        // We always initialize as read_only=true. Only when we're explicitly
1805        // allowed to we switch to read-write.
1806        let (read_only_tx, read_only_rx) = watch::channel(true);
1807
1808        Self {
1809            reported_frontiers: ReportedFrontiers::new(),
1810            dataflow_index,
1811            is_subscribe_or_copy,
1812            as_of,
1813            sink_token: None,
1814            sink_write_frontier: None,
1815            input_probes: Default::default(),
1816            compute_probe: None,
1817            logging: None,
1818            metrics,
1819            read_only_tx,
1820            read_only_rx,
1821        }
1822    }
1823
1824    /// Return the frontiers that have been reported to the controller.
1825    fn reported_frontiers(&self) -> &ReportedFrontiers {
1826        &self.reported_frontiers
1827    }
1828
1829    /// Reset all reported frontiers to the given value.
1830    pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1831        self.reported_frontiers.write_frontier = frontier.clone();
1832        self.reported_frontiers.input_frontier = frontier.clone();
1833        self.reported_frontiers.output_frontier = frontier;
1834    }
1835
1836    /// Set the write frontier that has been reported to the controller.
1837    fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1838        if let Some(logging) = &mut self.logging {
1839            let time = match &frontier {
1840                ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1841                ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1842            };
1843            logging.set_frontier(time);
1844        }
1845
1846        self.reported_frontiers.write_frontier = frontier;
1847    }
1848
1849    /// Set the input frontier that has been reported to the controller.
1850    fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1851        // Use this opportunity to update our input frontier logging.
1852        if let Some(logging) = &mut self.logging {
1853            for (id, probe) in &self.input_probes {
1854                let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1855                logging.set_import_frontier(*id, new_time);
1856            }
1857        }
1858
1859        self.reported_frontiers.input_frontier = frontier;
1860    }
1861
1862    /// Set the output frontier that has been reported to the controller.
1863    fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1864        let already_hydrated = self.hydrated();
1865
1866        self.reported_frontiers.output_frontier = frontier;
1867
1868        if !already_hydrated && self.hydrated() {
1869            if let Some(logging) = &mut self.logging {
1870                logging.set_hydrated();
1871            }
1872            self.metrics.record_collection_hydrated();
1873        }
1874    }
1875
1876    /// Return whether this collection is hydrated.
1877    fn hydrated(&self) -> bool {
1878        match &self.reported_frontiers.output_frontier {
1879            ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1880            ReportedFrontier::NotReported { .. } => false,
1881        }
1882    }
1883
1884    /// Allow writes for this collection.
1885    fn allow_writes(&self) {
1886        info!(
1887            dataflow_index = *self.dataflow_index,
1888            export = ?self.logging.as_ref().map(|l| l.export_id()),
1889            "allowing writes for dataflow",
1890        );
1891        let _ = self.read_only_tx.send(false);
1892    }
1893}