mz_transform/dataflow.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Whole-dataflow optimization
11//!
12//! A dataflow may contain multiple views, each of which may only be
13//! optimized locally. However, information like demand and predicate
14//! pushdown can be applied across views once we understand the context
15//! in which the views will be executed.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use itertools::Itertools;
20use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexImport};
21use mz_expr::{
22 AccessStrategy, CollectionPlan, Id, JoinImplementation, LocalId, MapFilterProject,
23 MirRelationExpr, MirScalarExpr, RECURSION_LIMIT,
24};
25use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError};
26use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log};
27use mz_repr::GlobalId;
28use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes};
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32use crate::monotonic::MonotonicFlag;
33use crate::notice::RawOptimizerNotice;
34use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
35
36/// Optimizes the implementation of each dataflow.
37///
38/// Inlines views, performs a full optimization pass including physical
39/// planning using the supplied indexes, propagates filtering and projection
40/// information to dataflow sources and lifts monotonicity information.
41#[mz_ore::instrument(
42 target = "optimizer",
43 level = "debug",
44 fields(path.segment ="global")
45)]
46pub fn optimize_dataflow(
47 dataflow: &mut DataflowDesc,
48 transform_ctx: &mut TransformCtx,
49 fast_path_optimizer: bool,
50) -> Result<(), TransformError> {
51 // Inline views that are used in only one other view.
52 inline_views(dataflow)?;
53
54 if fast_path_optimizer {
55 optimize_dataflow_relations(
56 dataflow,
57 &Optimizer::fast_path_optimizer(transform_ctx),
58 transform_ctx,
59 )?;
60 } else {
61 // Logical optimization pass after view inlining
62 optimize_dataflow_relations(
63 dataflow,
64 #[allow(deprecated)]
65 &Optimizer::logical_optimizer(transform_ctx),
66 transform_ctx,
67 )?;
68
69 optimize_dataflow_filters(dataflow)?;
70 // TODO: when the linear operator contract ensures that propagated
71 // predicates are always applied, projections and filters can be removed
72 // from where they come from. Once projections and filters can be removed,
73 // TODO: it would be useful for demand to be optimized after filters
74 // that way demand only includes the columns that are still necessary after
75 // the filters are applied.
76 optimize_dataflow_demand(dataflow)?;
77
78 // A smaller logical optimization pass after projections and filters are
79 // pushed down across views.
80 optimize_dataflow_relations(
81 dataflow,
82 &Optimizer::logical_cleanup_pass(transform_ctx, false),
83 transform_ctx,
84 )?;
85
86 // Physical optimization pass
87 optimize_dataflow_relations(
88 dataflow,
89 &Optimizer::physical_optimizer(transform_ctx),
90 transform_ctx,
91 )?;
92
93 optimize_dataflow_monotonic(dataflow, transform_ctx)?;
94 }
95
96 prune_and_annotate_dataflow_index_imports(
97 dataflow,
98 transform_ctx.indexes,
99 transform_ctx.df_meta,
100 )?;
101
102 mz_repr::explain::trace_plan(dataflow);
103
104 Ok(())
105}
106
107/// Inline views used in one other view, and in no exported objects.
108#[mz_ore::instrument(
109 target = "optimizer",
110 level = "debug",
111 fields(path.segment = "inline_views")
112)]
113fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
114 // We cannot inline anything whose `BuildDesc::id` appears in either the
115 // `index_exports` or `sink_exports` of `dataflow`, because we lose our
116 // ability to name it.
117
118 // A view can / should be in-lined in another view if it is only used by
119 // one subsequent view. If there are two distinct views that have not
120 // themselves been merged, then too bad and it doesn't get inlined.
121
122 // Starting from the *last* object to build, walk backwards and inline
123 // any view that is neither referenced by a `index_exports` nor
124 // `sink_exports` nor more than two remaining objects to build.
125
126 for index in (0..dataflow.objects_to_build.len()).rev() {
127 // Capture the name used by others to reference this view.
128 let global_id = dataflow.objects_to_build[index].id;
129 // Determine if any exports directly reference this view.
130 let mut occurs_in_export = false;
131 for (_gid, sink_desc) in dataflow.sink_exports.iter() {
132 if sink_desc.from == global_id {
133 occurs_in_export = true;
134 }
135 }
136 for (_, (index_desc, _)) in dataflow.index_exports.iter() {
137 if index_desc.on_id == global_id {
138 occurs_in_export = true;
139 }
140 }
141 // Count the number of subsequent views that reference this view.
142 let mut occurrences_in_later_views = Vec::new();
143 for other in (index + 1)..dataflow.objects_to_build.len() {
144 if dataflow.objects_to_build[other]
145 .plan
146 .depends_on()
147 .contains(&global_id)
148 {
149 occurrences_in_later_views.push(other);
150 }
151 }
152 // Inline if the view is referenced in one view and no exports.
153 if !occurs_in_export && occurrences_in_later_views.len() == 1 {
154 let other = occurrences_in_later_views[0];
155 // We can remove this view and insert it in the later view,
156 // but are not able to relocate the later view `other`.
157
158 // When splicing in the `index` view, we need to create disjoint
159 // identifiers for the Let's `body` and `value`, as well as a new
160 // identifier for the binding itself. Following `NormalizeLets`, we
161 // go with the binding first, then the value, then the body.
162 let mut id_gen = crate::IdGen::default();
163 let new_local = LocalId::new(id_gen.allocate_id());
164 // Use the same `id_gen` to assign new identifiers to `index`.
165 crate::normalize_lets::renumber_bindings(
166 dataflow.objects_to_build[index].plan.as_inner_mut(),
167 &mut id_gen,
168 )?;
169 // Assign new identifiers to the other relation.
170 crate::normalize_lets::renumber_bindings(
171 dataflow.objects_to_build[other].plan.as_inner_mut(),
172 &mut id_gen,
173 )?;
174 // Install the `new_local` name wherever `global_id` was used.
175 dataflow.objects_to_build[other]
176 .plan
177 .as_inner_mut()
178 .visit_pre_mut(|expr| {
179 if let MirRelationExpr::Get { id, .. } = expr {
180 if id == &Id::Global(global_id) {
181 *id = Id::Local(new_local);
182 }
183 }
184 });
185
186 // With identifiers rewritten, we can replace `other` with
187 // a `MirRelationExpr::Let` binding, whose value is `index` and
188 // whose body is `other`.
189 let body = dataflow.objects_to_build[other]
190 .plan
191 .as_inner_mut()
192 .take_dangerous();
193 let value = dataflow.objects_to_build[index]
194 .plan
195 .as_inner_mut()
196 .take_dangerous();
197 *dataflow.objects_to_build[other].plan.as_inner_mut() = MirRelationExpr::Let {
198 id: new_local,
199 value: Box::new(value),
200 body: Box::new(body),
201 };
202 dataflow.objects_to_build.remove(index);
203 }
204 }
205
206 mz_repr::explain::trace_plan(dataflow);
207
208 Ok(())
209}
210
211/// Performs either the logical or the physical optimization pass on the
212/// dataflow using the supplied set of indexes.
213#[mz_ore::instrument(
214 target = "optimizer",
215 level = "debug",
216 fields(path.segment = optimizer.name)
217)]
218fn optimize_dataflow_relations(
219 dataflow: &mut DataflowDesc,
220 optimizer: &Optimizer,
221 ctx: &mut TransformCtx,
222) -> Result<(), TransformError> {
223 // Re-optimize each dataflow
224 for object in dataflow.objects_to_build.iter_mut() {
225 // Re-run all optimizations on the composite views.
226 ctx.set_global_id(object.id);
227 optimizer.transform(object.plan.as_inner_mut(), ctx)?;
228 ctx.reset_global_id();
229 }
230
231 mz_repr::explain::trace_plan(dataflow);
232
233 Ok(())
234}
235
236/// Pushes demand information from published outputs to dataflow inputs,
237/// projecting away unnecessary columns.
238///
239/// Dataflows that exist for the sake of generating plan explanations do not
240/// have published outputs. In this case, we push demand information from views
241/// not depended on by other views to dataflow inputs.
242#[mz_ore::instrument(
243 target = "optimizer",
244 level = "debug",
245 fields(path.segment ="demand")
246)]
247fn optimize_dataflow_demand(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
248 // Maps id -> union of known columns demanded from the source/view with the
249 // corresponding id.
250 let mut demand = BTreeMap::new();
251
252 if dataflow.index_exports.is_empty() && dataflow.sink_exports.is_empty() {
253 // In the absence of any exports, just demand all columns from views
254 // that are not depended on by another view, which is currently the last
255 // object in `objects_to_build`.
256
257 // A DataflowDesc without exports is currently created in the context of
258 // EXPLAIN outputs. This ensures that the output has all the columns of
259 // the original explainee.
260 if let Some(build_desc) = dataflow.objects_to_build.iter_mut().rev().next() {
261 demand
262 .entry(Id::Global(build_desc.id))
263 .or_insert_with(BTreeSet::new)
264 .extend(0..build_desc.plan.as_inner_mut().arity());
265 }
266 } else {
267 // Demand all columns of inputs to sinks.
268 for (_id, sink) in dataflow.sink_exports.iter() {
269 let input_id = sink.from;
270 demand
271 .entry(Id::Global(input_id))
272 .or_insert_with(BTreeSet::new)
273 .extend(0..dataflow.arity_of(&input_id));
274 }
275
276 // Demand all columns of inputs to exported indexes.
277 for (_id, (desc, _typ)) in dataflow.index_exports.iter() {
278 let input_id = desc.on_id;
279 demand
280 .entry(Id::Global(input_id))
281 .or_insert_with(BTreeSet::new)
282 .extend(0..dataflow.arity_of(&input_id));
283 }
284 }
285
286 optimize_dataflow_demand_inner(
287 dataflow
288 .objects_to_build
289 .iter_mut()
290 .rev()
291 .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
292 &mut demand,
293 )?;
294
295 mz_repr::explain::trace_plan(dataflow);
296
297 Ok(())
298}
299
300/// Pushes demand through views in `view_sequence` in order, removing
301/// columns not demanded.
302///
303/// This method is made public for the sake of testing.
304/// TODO: make this private once we allow multiple exports per dataflow.
305pub fn optimize_dataflow_demand_inner<'a, I>(
306 view_sequence: I,
307 demand: &mut BTreeMap<Id, BTreeSet<usize>>,
308) -> Result<(), TransformError>
309where
310 I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
311{
312 // Maps id -> The projection that was pushed down on the view with the
313 // corresponding id.
314 let mut applied_projection = BTreeMap::new();
315 // Collect the mutable references to views after pushing projection down
316 // in order to run cleanup actions on them in a second loop.
317 let mut view_refs = Vec::new();
318 let projection_pushdown = crate::movement::ProjectionPushdown::default();
319 for (id, view) in view_sequence {
320 if let Some(columns) = demand.get(&id) {
321 let projection_pushed_down = columns.iter().map(|c| *c).collect();
322 // Push down the projection consisting of the entries of `columns`
323 // in increasing order.
324 projection_pushdown.action(view, &projection_pushed_down, demand)?;
325 let new_type = view.typ();
326 applied_projection.insert(id, (projection_pushed_down, new_type));
327 }
328 view_refs.push(view);
329 }
330
331 for view in view_refs {
332 // Update `Get` nodes to reflect any columns that have been projected away.
333 projection_pushdown.update_projection_around_get(view, &applied_projection);
334 }
335
336 Ok(())
337}
338
339/// Pushes predicate to dataflow inputs.
340#[mz_ore::instrument(
341 target = "optimizer",
342 level = "debug",
343 fields(path.segment ="filters")
344)]
345fn optimize_dataflow_filters(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
346 // Contains id -> predicates map, describing those predicates that
347 // can (but need not) be applied to the collection named by `id`.
348 let mut predicates = BTreeMap::<Id, BTreeSet<mz_expr::MirScalarExpr>>::new();
349
350 // Propagate predicate information from outputs to inputs.
351 optimize_dataflow_filters_inner(
352 dataflow
353 .objects_to_build
354 .iter_mut()
355 .rev()
356 .map(|build_desc| (Id::Global(build_desc.id), build_desc.plan.as_inner_mut())),
357 &mut predicates,
358 )?;
359
360 // Push predicate information into the SourceDesc.
361 for (source_id, (source, _monotonic, _upper)) in dataflow.source_imports.iter_mut() {
362 if let Some(list) = predicates.remove(&Id::Global(*source_id)) {
363 if !list.is_empty() {
364 // Canonicalize the order of predicates, for stable plans.
365 let mut list = list.into_iter().collect::<Vec<_>>();
366 list.sort();
367 // Install no-op predicate information if none exists.
368 if source.arguments.operators.is_none() {
369 source.arguments.operators = Some(MapFilterProject::new(source.typ.arity()));
370 }
371 // Add any predicates that can be pushed to the source.
372 if let Some(operator) = source.arguments.operators.take() {
373 source.arguments.operators = Some(operator.filter(list));
374 source.arguments.operators.as_mut().map(|x| x.optimize());
375 }
376 }
377 }
378 }
379
380 mz_repr::explain::trace_plan(dataflow);
381
382 Ok(())
383}
384
385/// Pushes filters down through views in `view_sequence` in order.
386///
387/// This method is made public for the sake of testing.
388/// TODO: make this private once we allow multiple exports per dataflow.
389pub fn optimize_dataflow_filters_inner<'a, I>(
390 view_iter: I,
391 predicates: &mut BTreeMap<Id, BTreeSet<mz_expr::MirScalarExpr>>,
392) -> Result<(), TransformError>
393where
394 I: Iterator<Item = (Id, &'a mut MirRelationExpr)>,
395{
396 let transform = crate::predicate_pushdown::PredicatePushdown::default();
397 for (id, view) in view_iter {
398 if let Some(list) = predicates.get(&id).clone() {
399 if !list.is_empty() {
400 *view = view.take_dangerous().filter(list.iter().cloned());
401 }
402 }
403 transform.action(view, predicates)?;
404 }
405 Ok(())
406}
407
408/// Propagates information about monotonic inputs through operators,
409/// using [`mz_repr::optimize::OptimizerFeatures`] from `ctx` for [`crate::analysis::Analysis`].
410#[mz_ore::instrument(
411 target = "optimizer",
412 level = "debug",
413 fields(path.segment ="monotonic")
414)]
415pub fn optimize_dataflow_monotonic(
416 dataflow: &mut DataflowDesc,
417 ctx: &mut TransformCtx,
418) -> Result<(), TransformError> {
419 let mut monotonic_ids = BTreeSet::new();
420 for (source_id, (_source, is_monotonic, _upper)) in dataflow.source_imports.iter() {
421 if *is_monotonic {
422 monotonic_ids.insert(source_id.clone());
423 }
424 }
425 for (
426 _index_id,
427 IndexImport {
428 desc: index_desc,
429 monotonic,
430 ..
431 },
432 ) in dataflow.index_imports.iter()
433 {
434 if *monotonic {
435 monotonic_ids.insert(index_desc.on_id.clone());
436 }
437 }
438
439 let monotonic_flag = MonotonicFlag::default();
440
441 for build_desc in dataflow.objects_to_build.iter_mut() {
442 monotonic_flag.transform(build_desc.plan.as_inner_mut(), ctx, &monotonic_ids)?;
443 }
444
445 mz_repr::explain::trace_plan(dataflow);
446
447 Ok(())
448}
449
450/// Restricts the indexes imported by `dataflow` to only the ones it needs.
451/// It also adds to the `DataflowMetainfo` how each index will be used.
452/// It also annotates global `Get`s with whether they will be reads from Persist or an index, plus
453/// their index usage types.
454///
455/// The input `dataflow` should import all indexes belonging to all views/sources/tables it
456/// references.
457///
458/// The input plans should be normalized with `NormalizeLets`! Otherwise, we might find dangling
459/// `ArrangeBy`s at the top of unused Let bindings.
460#[mz_ore::instrument(
461 target = "optimizer",
462 level = "debug",
463 fields(path.segment = "index_imports")
464)]
465fn prune_and_annotate_dataflow_index_imports(
466 dataflow: &mut DataflowDesc,
467 indexes: &dyn IndexOracle,
468 dataflow_metainfo: &mut DataflowMetainfo,
469) -> Result<(), TransformError> {
470 // Preparation.
471 // Let's save the unique keys of the sources. This will inform which indexes to choose for full
472 // scans. (We can't get this info from `source_imports`, because `source_imports` only has those
473 // sources that are not getting an indexed read.)
474 let mut source_keys = BTreeMap::new();
475 for build_desc in dataflow.objects_to_build.iter() {
476 build_desc
477 .plan
478 .as_inner()
479 .visit_pre(|expr: &MirRelationExpr| match expr {
480 MirRelationExpr::Get {
481 id: Id::Global(global_id),
482 typ,
483 ..
484 } => {
485 source_keys.entry(*global_id).or_insert_with(|| {
486 typ.keys
487 .iter()
488 .map(|key| {
489 key.iter()
490 // Convert the Vec<usize> key to Vec<MirScalarExpr>, so that
491 // later we can more easily compare index keys to these keys.
492 .map(|col_idx| MirScalarExpr::Column(*col_idx))
493 .collect()
494 })
495 .collect()
496 });
497 }
498 _ => {}
499 });
500 }
501
502 // This will be a mapping of
503 // (ids used by exports and objects to build) ->
504 // (arrangement keys and usage types on that id that have been requested)
505 let mut index_reqs_by_id = BTreeMap::new();
506
507 // Go through the MIR plans of `objects_to_build` and collect which arrangements are requested
508 // for which we also have an available index.
509 for build_desc in dataflow.objects_to_build.iter_mut() {
510 CollectIndexRequests::new(&source_keys, indexes, &mut index_reqs_by_id)
511 .collect_index_reqs(build_desc.plan.as_inner_mut())?;
512 }
513
514 // Collect index usages by `sink_exports`.
515 // A sink export sometimes wants to directly use an imported index. I know of one case where
516 // this happens: The dataflow for a SUBSCRIBE on an indexed view won't have any
517 // `objects_to_build`, but will want to directly read from the index and write to a sink.
518 for (_sink_id, sink_desc) in dataflow.sink_exports.iter() {
519 // First, let's see if there exists an index on the id that the sink wants. If not, there is
520 // nothing we can do here.
521 if let Some((idx_id, arbitrary_idx_key)) = indexes.indexes_on(sink_desc.from).next() {
522 // If yes, then we'll add a request of _some_ index: If we already collected an index
523 // request on this id, then use that, otherwise use the above arbitrarily picked index.
524 let requested_idxs = index_reqs_by_id
525 .entry(sink_desc.from)
526 .or_insert_with(Vec::new);
527 if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
528 requested_idxs.push((
529 *already_req_idx_id,
530 already_req_key.clone(),
531 IndexUsageType::SinkExport,
532 ));
533 } else {
534 requested_idxs.push((
535 idx_id,
536 arbitrary_idx_key.to_owned(),
537 IndexUsageType::SinkExport,
538 ));
539 }
540 }
541 }
542
543 // Collect index usages by `index_exports`.
544 for (_id, (index_desc, _)) in dataflow.index_exports.iter() {
545 // First, let's see if there exists an index on the id that the exported index is on. If
546 // not, there is nothing we can do here.
547 if let Some((idx_id, arbitrary_index_key)) = indexes.indexes_on(index_desc.on_id).next() {
548 // If yes, then we'll add an index request of some index: If we already collected an
549 // index request on this id, then use that, otherwise use the above arbitrarily picked
550 // index.
551 let requested_idxs = index_reqs_by_id
552 .entry(index_desc.on_id)
553 .or_insert_with(Vec::new);
554 if let Some((already_req_idx_id, already_req_key, _)) = requested_idxs.get(0) {
555 requested_idxs.push((
556 *already_req_idx_id,
557 already_req_key.clone(),
558 IndexUsageType::IndexExport,
559 ));
560 } else {
561 // This is surprising: Actually, an index creation dataflow always has a plan in
562 // `objects_to_build` that will have a Get of the object that the index is on (see
563 // `DataflowDescription::export_index`). Therefore, we should have already requested
564 // an index usage when seeing that Get in `CollectIndexRequests`.
565 soft_panic_or_log!(
566 "We are seeing an index export on an id that's not mentioned in `objects_to_build`"
567 );
568 requested_idxs.push((
569 idx_id,
570 arbitrary_index_key.to_owned(),
571 IndexUsageType::IndexExport,
572 ));
573 }
574 }
575 }
576
577 // By now, `index_reqs_by_id` has all ids that we think might benefit from having an index on.
578 // Moreover, for each of these ids, if any index exists on it, then we should have already
579 // picked one. If not, then we have a bug somewhere. In that case, do a soft panic, and add an
580 // Unknown usage, picking an arbitrary index.
581 for (id, index_reqs) in index_reqs_by_id.iter_mut() {
582 if index_reqs.is_empty() {
583 // Try to pick an arbitrary index to be fully scanned.
584 if let Some((idx_id, key)) = indexes.indexes_on(*id).next() {
585 soft_panic_or_log!(
586 "prune_and_annotate_dataflow_index_imports didn't find any index for an id, even though one exists
587id: {}, key: {:?}",
588 id,
589 key
590 );
591 index_reqs.push((idx_id, key.to_owned(), IndexUsageType::Unknown));
592 }
593 }
594 }
595
596 // Adjust FullScans to not introduce a new index dependency if there is also some non-FullScan
597 // request on the same id.
598 // `full_scan_changes` saves the changes that we do: Each (Get id, index id) entry indicates
599 // that if a Get has that id, then any full scan index accesses on it should be changed to use
600 // the indicated index id.
601 let mut full_scan_changes = BTreeMap::new();
602 for (get_id, index_reqs) in index_reqs_by_id.iter_mut() {
603 // Let's choose a non-FullScan access (if exists).
604 if let Some((picked_idx, picked_idx_key)) = choose_index(
605 &source_keys,
606 get_id,
607 &index_reqs
608 .iter()
609 .filter_map(|(idx_id, key, usage_type)| match usage_type {
610 IndexUsageType::FullScan => None,
611 _ => Some((*idx_id, key.clone())),
612 })
613 .collect_vec(),
614 ) {
615 // Found a non-FullScan access. Modify all FullScans to use the same index as that one.
616 for (idx_id, key, usage_type) in index_reqs {
617 match usage_type {
618 IndexUsageType::FullScan => {
619 full_scan_changes.insert(get_id, picked_idx);
620 *idx_id = picked_idx;
621 key.clone_from(&picked_idx_key);
622 }
623 _ => {}
624 }
625 }
626 }
627 }
628 // Apply the above full scan changes to also the Gets.
629 for build_desc in dataflow.objects_to_build.iter_mut() {
630 build_desc
631 .plan
632 .as_inner_mut()
633 .visit_pre_mut(|expr: &mut MirRelationExpr| {
634 match expr {
635 MirRelationExpr::Get {
636 id: Id::Global(global_id),
637 typ: _,
638 access_strategy: persist_or_index,
639 } => {
640 if let Some(new_idx_id) = full_scan_changes.get(global_id) {
641 match persist_or_index {
642 AccessStrategy::UnknownOrLocal => {
643 // Should have been already filled by `collect_index_reqs`.
644 unreachable!()
645 }
646 AccessStrategy::Persist => {
647 // We already know that it's an indexed access.
648 unreachable!()
649 }
650 AccessStrategy::SameDataflow => {
651 // We have not added such annotations yet.
652 unreachable!()
653 }
654 AccessStrategy::Index(accesses) => {
655 for (idx_id, usage_type) in accesses {
656 if matches!(usage_type, IndexUsageType::FullScan) {
657 *idx_id = *new_idx_id;
658 }
659 }
660 }
661 }
662 }
663 }
664 _ => {}
665 }
666 });
667 }
668
669 // Annotate index imports by their usage types
670 dataflow_metainfo.index_usage_types = BTreeMap::new();
671 for (
672 index_id,
673 IndexImport {
674 desc: index_desc,
675 typ: _,
676 monotonic: _,
677 },
678 ) in dataflow.index_imports.iter_mut()
679 {
680 // A sanity check that we are not importing an index that we are also exporting.
681 assert!(
682 !dataflow
683 .index_exports
684 .iter()
685 .map(|(exported_index_id, _)| exported_index_id)
686 .any(|exported_index_id| exported_index_id == index_id)
687 );
688
689 let mut new_usage_types = Vec::new();
690 // Let's see whether something has requested an index on this object that this imported
691 // index is on.
692 if let Some(index_reqs) = index_reqs_by_id.get(&index_desc.on_id) {
693 for (req_idx_id, req_key, req_usage_type) in index_reqs {
694 if req_idx_id == index_id {
695 soft_assert_eq_or_log!(*req_key, index_desc.key);
696 new_usage_types.push(req_usage_type.clone());
697 }
698 }
699 }
700 if !new_usage_types.is_empty() {
701 dataflow_metainfo
702 .index_usage_types
703 .insert(*index_id, new_usage_types);
704 }
705 }
706
707 // Prune index imports to only those that are used
708 dataflow
709 .index_imports
710 .retain(|id, _index_import| dataflow_metainfo.index_usage_types.contains_key(id));
711
712 // Determine AccessStrategy::SameDataflow accesses. These were classified as
713 // AccessStrategy::Persist inside collect_index_reqs, so now we check these, and if the id is of
714 // a collection that we are building ourselves, then we adjust the access strategy.
715 let mut objects_to_build_ids = BTreeSet::new();
716 for BuildDesc { id, plan: _ } in dataflow.objects_to_build.iter() {
717 objects_to_build_ids.insert(id.clone());
718 }
719 for build_desc in dataflow.objects_to_build.iter_mut() {
720 build_desc
721 .plan
722 .as_inner_mut()
723 .visit_pre_mut(|expr: &mut MirRelationExpr| match expr {
724 MirRelationExpr::Get {
725 id: Id::Global(global_id),
726 typ: _,
727 access_strategy,
728 } => match access_strategy {
729 AccessStrategy::Persist => {
730 if objects_to_build_ids.contains(global_id) {
731 *access_strategy = AccessStrategy::SameDataflow;
732 }
733 }
734 _ => {}
735 },
736 _ => {}
737 });
738 }
739
740 // A sanity check that all Get annotations indicate indexes that are present in `index_imports`.
741 for build_desc in dataflow.objects_to_build.iter() {
742 build_desc
743 .plan
744 .as_inner()
745 .visit_pre(|expr: &MirRelationExpr| match expr {
746 MirRelationExpr::Get {
747 id: Id::Global(_),
748 typ: _,
749 access_strategy: AccessStrategy::Index(accesses),
750 } => {
751 for (idx_id, _) in accesses {
752 soft_assert_or_log!(
753 dataflow.index_imports.contains_key(idx_id),
754 "Dangling Get index annotation"
755 );
756 }
757 }
758 _ => {}
759 });
760 }
761
762 mz_repr::explain::trace_plan(dataflow);
763
764 Ok(())
765}
766
767/// Pick an index from a given Vec of index keys.
768///
769/// Currently, we pick as follows:
770/// - If there is an index on a unique key, then we pick that. (It might be better distributed, and
771/// is less likely to get dropped than other indexes.)
772/// - Otherwise, we pick an arbitrary index.
773///
774/// TODO: There are various edge cases where a better choice would be possible:
775/// - Some indexes might be less skewed than others. (Although, picking a unique key tries to
776/// capture this already.)
777/// - Some indexes might have an error, while others don't.
778/// <https://github.com/MaterializeInc/database-issues/issues/4455>
779/// - Some indexes might have more extra data in their keys (because of being on more complicated
780/// expressions than just column references), which won't be used in a full scan.
781fn choose_index(
782 source_keys: &BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
783 id: &GlobalId,
784 indexes: &Vec<(GlobalId, Vec<MirScalarExpr>)>,
785) -> Option<(GlobalId, Vec<MirScalarExpr>)> {
786 match source_keys.get(id) {
787 None => indexes.iter().next().cloned(), // pick an arbitrary index
788 Some(coll_keys) => match indexes
789 .iter()
790 .find(|(_idx_id, key)| coll_keys.contains(&*key))
791 {
792 Some((idx_id, key)) => Some((*idx_id, key.clone())),
793 None => indexes.iter().next().cloned(), // pick an arbitrary index
794 },
795 }
796}
797
798#[derive(Debug)]
799struct CollectIndexRequests<'a> {
800 /// We were told about these unique keys on sources.
801 source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
802 /// We were told about these indexes being available.
803 indexes_available: &'a dyn IndexOracle,
804 /// We'll be collecting index requests here.
805 index_reqs_by_id:
806 &'a mut BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>>,
807 /// As we recurse down a MirRelationExpr, we'll need to keep track of the context of the
808 /// current node (see docs on `IndexUsageContext` about what context we keep).
809 /// Moreover, we need to propagate this context from cte uses to cte definitions.
810 /// `context_across_lets` will keep track of the contexts that reached each use of a LocalId
811 /// added together.
812 context_across_lets: BTreeMap<LocalId, Vec<IndexUsageContext>>,
813 recursion_guard: RecursionGuard,
814}
815
816impl<'a> CheckedRecursion for CollectIndexRequests<'a> {
817 fn recursion_guard(&self) -> &RecursionGuard {
818 &self.recursion_guard
819 }
820}
821
822impl<'a> CollectIndexRequests<'a> {
823 fn new(
824 source_keys: &'a BTreeMap<GlobalId, BTreeSet<Vec<MirScalarExpr>>>,
825 indexes_available: &'a dyn IndexOracle,
826 index_reqs_by_id: &'a mut BTreeMap<
827 GlobalId,
828 Vec<(GlobalId, Vec<MirScalarExpr>, IndexUsageType)>,
829 >,
830 ) -> CollectIndexRequests<'a> {
831 CollectIndexRequests {
832 source_keys,
833 indexes_available,
834 index_reqs_by_id,
835 context_across_lets: BTreeMap::new(),
836 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
837 }
838 }
839
840 pub fn collect_index_reqs(
841 &mut self,
842 expr: &mut MirRelationExpr,
843 ) -> Result<(), RecursionLimitError> {
844 assert!(self.context_across_lets.is_empty());
845 self.collect_index_reqs_inner(
846 expr,
847 &IndexUsageContext::from_usage_type(IndexUsageType::PlanRootNoArrangement),
848 )?;
849 assert!(self.context_across_lets.is_empty());
850 // Sanity check that we don't have any `DeltaJoinIndexUsageType::Unknown` remaining.
851 for (_id, index_reqs) in self.index_reqs_by_id.iter() {
852 for (_, _, index_usage_type) in index_reqs {
853 soft_assert_or_log!(
854 !matches!(
855 index_usage_type,
856 IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
857 ),
858 "Delta join Unknown index usage remained"
859 );
860 }
861 }
862 Ok(())
863 }
864
865 fn collect_index_reqs_inner(
866 &mut self,
867 expr: &mut MirRelationExpr,
868 contexts: &Vec<IndexUsageContext>,
869 ) -> Result<(), RecursionLimitError> {
870 self.checked_recur_mut(|this| {
871
872 // If an index exists on `on_id`, this function picks an index to be fully scanned.
873 let pick_index_for_full_scan = |on_id: &GlobalId| {
874 // Note that the choice we make here might be modified later at the
875 // "Adjust FullScans to not introduce a new index dependency".
876 choose_index(
877 this.source_keys,
878 on_id,
879 &this.indexes_available.indexes_on(*on_id).map(
880 |(idx_id, key)| (idx_id, key.iter().cloned().collect_vec())
881 ).collect_vec()
882 )
883 };
884
885 // See comment on `IndexUsageContext`.
886 Ok(match expr {
887 MirRelationExpr::Join {
888 inputs,
889 implementation,
890 ..
891 } => {
892 match implementation {
893 JoinImplementation::Differential(..) => {
894 for input in inputs {
895 this.collect_index_reqs_inner(
896 input,
897 &IndexUsageContext::from_usage_type(
898 IndexUsageType::DifferentialJoin,
899 ),
900 )?;
901 }
902 }
903 JoinImplementation::DeltaQuery(..) => {
904 // For Delta joins, the first input is special, see
905 // https://github.com/MaterializeInc/database-issues/issues/2115
906 this.collect_index_reqs_inner(
907 &mut inputs[0],
908 &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)),
909 )?;
910 for input in &mut inputs[1..] {
911 this.collect_index_reqs_inner(
912 input,
913 &IndexUsageContext::from_usage_type(IndexUsageType::DeltaJoin(
914 DeltaJoinIndexUsageType::Lookup,
915 )),
916 )?;
917 }
918 }
919 JoinImplementation::IndexedFilter(_coll_id, idx_id, ..) => {
920 for input in inputs {
921 this.collect_index_reqs_inner(
922 input,
923 &IndexUsageContext::from_usage_type(IndexUsageType::Lookup(*idx_id)),
924 )?;
925 }
926 }
927 JoinImplementation::Unimplemented => {
928 soft_panic_or_log!(
929 "CollectIndexRequests encountered an Unimplemented join"
930 );
931 }
932 }
933 }
934 MirRelationExpr::ArrangeBy { input, keys } => {
935 this.collect_index_reqs_inner(input, &IndexUsageContext::add_keys(contexts, keys))?;
936 }
937 MirRelationExpr::Get {
938 id: Id::Global(global_id),
939 access_strategy: persist_or_index,
940 ..
941 } => {
942 this.index_reqs_by_id
943 .entry(*global_id)
944 .or_insert_with(Vec::new);
945 // If the context is empty, it means we didn't see an operator that would
946 // specifically want to use an index for this Get. However, let's still try to
947 // find an index for a full scan.
948 let mut try_full_scan = contexts.is_empty();
949 let mut index_accesses = Vec::new();
950 for context in contexts {
951 match &context.requested_keys {
952 None => {
953 // We have some index usage context, but didn't see an `ArrangeBy`.
954 try_full_scan = true;
955 match context.usage_type {
956 IndexUsageType::FullScan | IndexUsageType::SinkExport | IndexUsageType::IndexExport => {
957 // Not possible, because these don't go through
958 // IndexUsageContext at all.
959 unreachable!()
960 },
961 // You can find more info on why the following join cases
962 // shouldn't happen in comments of the Join lowering to LIR.
963 IndexUsageType::Lookup(_) => soft_panic_or_log!("CollectIndexRequests encountered an IndexedFilter join without an ArrangeBy"),
964 IndexUsageType::DifferentialJoin => soft_panic_or_log!("CollectIndexRequests encountered a Differential join without an ArrangeBy"),
965 IndexUsageType::DeltaJoin(_) => soft_panic_or_log!("CollectIndexRequests encountered a Delta join without an ArrangeBy"),
966 IndexUsageType::PlanRootNoArrangement => {
967 // This is ok: the entire plan is a `Get`, with not even an
968 // `ArrangeBy`. Note that if an index exists, the usage will
969 // be saved as `FullScan` (NOT as `PlanRootNoArrangement`),
970 // because we are going into the `try_full_scan` if.
971 },
972 IndexUsageType::FastPathLimit => {
973 // These are created much later, not even inside
974 // `prune_and_annotate_dataflow_index_imports`.
975 unreachable!()
976 }
977 IndexUsageType::DanglingArrangeBy => {
978 // Not possible, because we create `DanglingArrangeBy`
979 // only when we see an `ArrangeBy`.
980 unreachable!()
981 },
982 IndexUsageType::Unknown => {
983 // These are added only after `CollectIndexRequests` has run.
984 unreachable!()
985 }
986 }
987 }
988 Some(requested_keys) => {
989 for requested_key in requested_keys {
990 match this
991 .indexes_available
992 .indexes_on(*global_id)
993 .find(|(available_idx_id, available_key)| {
994 match context.usage_type {
995 IndexUsageType::Lookup(req_idx_id) => {
996 // `LiteralConstraints` already picked an index
997 // by id. Let's use that one.
998 assert!(!(available_idx_id == &req_idx_id && available_key != &requested_key));
999 available_idx_id == &req_idx_id
1000 },
1001 _ => available_key == &requested_key,
1002 }
1003 })
1004 {
1005 Some((idx_id, key)) => {
1006 this.index_reqs_by_id
1007 .get_mut(global_id)
1008 .unwrap()
1009 .push((idx_id, key.to_owned(), context.usage_type.clone()));
1010 index_accesses.push((idx_id, context.usage_type.clone()));
1011 },
1012 None => {
1013 // If there is a key requested for which we don't have an
1014 // index, then we might still be able to do a full scan of a
1015 // differently keyed index.
1016 try_full_scan = true;
1017 }
1018 }
1019 }
1020 if requested_keys.is_empty() {
1021 // It's a bit weird if an MIR ArrangeBy is not requesting any
1022 // key, but let's try a full scan in that case anyhow.
1023 try_full_scan = true;
1024 }
1025 }
1026 }
1027 }
1028 if try_full_scan {
1029 // Keep in mind that when having 2 contexts coming from 2 uses of a Let,
1030 // this code can't distinguish between the case when there is 1 ArrangeBy at the
1031 // top of the Let, or when the 2 uses each have an `ArrangeBy`. In both cases,
1032 // we'll add only 1 full scan, which would be wrong in the latter case. However,
1033 // the latter case can't currently happen until we do
1034 // https://github.com/MaterializeInc/database-issues/issues/6363
1035 // Also note that currently we are deduplicating index usage types when
1036 // printing index usages in EXPLAIN.
1037 if let Some((idx_id, key)) = pick_index_for_full_scan(global_id) {
1038 this.index_reqs_by_id
1039 .get_mut(global_id)
1040 .unwrap()
1041 .push((idx_id, key.to_owned(), IndexUsageType::FullScan));
1042 index_accesses.push((idx_id, IndexUsageType::FullScan));
1043 }
1044 }
1045 if index_accesses.is_empty() {
1046 *persist_or_index = AccessStrategy::Persist;
1047 } else {
1048 *persist_or_index = AccessStrategy::Index(index_accesses);
1049 }
1050 }
1051 MirRelationExpr::Get {
1052 id: Id::Local(local_id),
1053 ..
1054 } => {
1055 // Add the current context to the vector of contexts of `local_id`.
1056 // (The unwrap is safe, because the Let and LetRec cases start with inserting an
1057 // empty entry.)
1058 this.context_across_lets
1059 .get_mut(local_id)
1060 .unwrap()
1061 .extend(contexts.iter().cloned());
1062 // No recursive call here, because Get has no inputs.
1063 }
1064 MirRelationExpr::Let { id, value, body } => {
1065 let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1066 // No shadowing in MIR
1067 assert_none!(shadowed_context);
1068 // We go backwards: Recurse on the body and then the value.
1069 this.collect_index_reqs_inner(body, contexts)?;
1070 // The above call filled in the entry for `id` in `context_across_lets` (if it
1071 // was referenced). Anyhow, at least an empty entry should exist, because we started
1072 // above with inserting it.
1073 this.collect_index_reqs_inner(
1074 value,
1075 &this.context_across_lets[id].clone(),
1076 )?;
1077 // Clean up the id from the saved contexts.
1078 this.context_across_lets.remove(id);
1079 }
1080 MirRelationExpr::LetRec {
1081 ids,
1082 values,
1083 limits: _,
1084 body,
1085 } => {
1086 for id in ids.iter() {
1087 let shadowed_context = this.context_across_lets.insert(id.clone(), Vec::new());
1088 assert_none!(shadowed_context); // No shadowing in MIR
1089 }
1090 // We go backwards: Recurse on the body first.
1091 this.collect_index_reqs_inner(body, contexts)?;
1092 // Reset the contexts of the ids (of the current LetRec), because an arrangement
1093 // from a value can't be used in the body.
1094 for id in ids.iter() {
1095 *this.context_across_lets.get_mut(id).unwrap() = Vec::new();
1096 }
1097 // Recurse on the values in reverse order.
1098 // Note that we do only one pass, i.e., we won't see context through a Get that
1099 // refers to the previous iteration. But this is ok, because we can't reuse
1100 // arrangements across iterations anyway.
1101 for (id, value) in ids.iter().zip(values.iter_mut()).rev() {
1102 this.collect_index_reqs_inner(
1103 value,
1104 &this.context_across_lets[id].clone(),
1105 )?;
1106 }
1107 // Clean up the ids from the saved contexts.
1108 for id in ids {
1109 this.context_across_lets.remove(id);
1110 }
1111 }
1112 _ => {
1113 // Nothing interesting at this node, recurse with the empty context (regardless of
1114 // what context we got from above).
1115 let empty_context = Vec::new();
1116 for child in expr.children_mut() {
1117 this.collect_index_reqs_inner(child, &empty_context)?;
1118 }
1119 }
1120 })
1121 })
1122 }
1123}
1124
1125/// This struct will save info about parent nodes as we are descending down a `MirRelationExpr`.
1126/// We always start with filling in `usage_type` when we see an operation that uses an arrangement,
1127/// and then we fill in `requested_keys` when we see an `ArrangeBy`. So, the pattern that we are
1128/// looking for is
1129/// ```text
1130/// <operation that uses an index>
1131/// ArrangeBy <requested_keys>
1132/// Get <global_id>
1133/// ```
1134/// When we reach a `Get` to a global id, we access this context struct to see if the rest of the
1135/// pattern is present above the `Get`.
1136///
1137/// Note that we usually put this struct in a Vec, because we track context across local let
1138/// bindings, which means that a node can have multiple parents.
1139#[derive(Debug, Clone)]
1140struct IndexUsageContext {
1141 usage_type: IndexUsageType,
1142 requested_keys: Option<BTreeSet<Vec<MirScalarExpr>>>,
1143}
1144
1145impl IndexUsageContext {
1146 pub fn from_usage_type(usage_type: IndexUsageType) -> Vec<Self> {
1147 vec![IndexUsageContext {
1148 usage_type,
1149 requested_keys: None,
1150 }]
1151 }
1152
1153 // Add the keys of an ArrangeBy into the contexts.
1154 // Soft_panics if haven't already seen something that indicates what the index will be used for.
1155 pub fn add_keys(
1156 old_contexts: &Vec<IndexUsageContext>,
1157 keys_to_add: &Vec<Vec<MirScalarExpr>>,
1158 ) -> Vec<IndexUsageContext> {
1159 let old_contexts = if old_contexts.is_empty() {
1160 // No join above us, and we are not at the root. Why does this ArrangeBy even exist?
1161 soft_panic_or_log!("CollectIndexRequests encountered a dangling ArrangeBy");
1162 // Anyhow, let's create a context with a `DanglingArrangeBy` index usage, so that we
1163 // have a place to note down the requested keys below.
1164 IndexUsageContext::from_usage_type(IndexUsageType::DanglingArrangeBy)
1165 } else {
1166 old_contexts.clone()
1167 };
1168 old_contexts
1169 .into_iter()
1170 .flat_map(|old_context| {
1171 if !matches!(
1172 old_context.usage_type,
1173 IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Unknown)
1174 ) {
1175 // If it's not an unknown delta join usage, then we simply note down the new
1176 // keys into `requested_keys`.
1177 let mut context = old_context.clone();
1178 if context.requested_keys.is_none() {
1179 context.requested_keys = Some(BTreeSet::new());
1180 }
1181 context
1182 .requested_keys
1183 .as_mut()
1184 .unwrap()
1185 .extend(keys_to_add.iter().cloned());
1186 Some(context).into_iter().chain(None)
1187 } else {
1188 // If it's an unknown delta join usage, then we need to figure out whether this
1189 // is a full scan or a lookup.
1190 //
1191 // `source_key` in `DeltaPathPlan` determines which arrangement we are going to
1192 // scan when starting the rendering of a delta path. This is the one for which
1193 // we want a `DeltaJoinIndexUsageType::FirstInputFullScan`.
1194 //
1195 // However, `DeltaPathPlan` is an LIR concept, and here we need to figure out
1196 // the `source_key` based on the MIR plan. We do this by doing the same as
1197 // `DeltaJoinPlan::create_from`: choose the smallest key (by `Ord`).
1198 let source_key = keys_to_add
1199 .iter()
1200 .min()
1201 .expect("ArrangeBy below a delta join has at least one key");
1202 let full_scan_context = IndexUsageContext {
1203 requested_keys: Some(BTreeSet::from([source_key.clone()])),
1204 usage_type: IndexUsageType::DeltaJoin(
1205 DeltaJoinIndexUsageType::FirstInputFullScan,
1206 ),
1207 };
1208 let lookup_keys = keys_to_add
1209 .into_iter()
1210 .filter(|key| *key != source_key)
1211 .cloned()
1212 .collect_vec();
1213 if lookup_keys.is_empty() {
1214 Some(full_scan_context).into_iter().chain(None)
1215 } else {
1216 let lookup_context = IndexUsageContext {
1217 requested_keys: Some(lookup_keys.into_iter().collect()),
1218 usage_type: IndexUsageType::DeltaJoin(DeltaJoinIndexUsageType::Lookup),
1219 };
1220 Some(full_scan_context)
1221 .into_iter()
1222 .chain(Some(lookup_context))
1223 }
1224 }
1225 })
1226 .collect()
1227 }
1228}
1229
1230/// Extra information about the dataflow. This is not going to be shipped, but has to be processed
1231/// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog.
1232#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
1233pub struct DataflowMetainfo<Notice = RawOptimizerNotice> {
1234 /// Notices that the optimizer wants to show to users.
1235 /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`].
1236 pub optimizer_notices: Vec<Notice>,
1237 /// What kind of operation (full scan, lookup, ...) will access each index. Computed by
1238 /// `prune_and_annotate_dataflow_index_imports`.
1239 pub index_usage_types: BTreeMap<GlobalId, Vec<IndexUsageType>>,
1240}
1241
1242impl Default for DataflowMetainfo {
1243 fn default() -> Self {
1244 DataflowMetainfo {
1245 optimizer_notices: Vec::new(),
1246 index_usage_types: BTreeMap::new(),
1247 }
1248 }
1249}
1250
1251impl<Notice> DataflowMetainfo<Notice> {
1252 /// Create a [`UsedIndexes`] instance by resolving each `id` in the
1253 /// `index_ids` iterator against an entry expected to exist in the
1254 /// [`DataflowMetainfo::index_usage_types`].
1255 pub fn used_indexes<T>(&self, df_desc: &DataflowDescription<T>) -> UsedIndexes {
1256 UsedIndexes::new(
1257 df_desc
1258 .index_imports
1259 .iter()
1260 .map(|(id, _)| {
1261 let entry = self.index_usage_types.get(id).cloned();
1262 // If an entry does not exist, mark the usage type for this
1263 // index as `Unknown`.
1264 //
1265 // This should never happen if this method is called after
1266 // running `prune_and_annotate_dataflow_index_imports` on
1267 // the dataflow (this happens at the end of the
1268 // `optimize_dataflow` call).
1269 let index_usage_type = entry.unwrap_or_else(|| vec![IndexUsageType::Unknown]);
1270
1271 (*id, index_usage_type)
1272 })
1273 .collect(),
1274 )
1275 }
1276}
1277
1278impl DataflowMetainfo<RawOptimizerNotice> {
1279 /// Pushes a [`RawOptimizerNotice`] into [`Self::optimizer_notices`], but
1280 /// only if the exact same notice is not already present.
1281 pub fn push_optimizer_notice_dedup<T>(&mut self, notice: T)
1282 where
1283 T: Into<RawOptimizerNotice>,
1284 {
1285 let notice = notice.into();
1286 if !self.optimizer_notices.contains(¬ice) {
1287 self.optimizer_notices.push(notice);
1288 }
1289 }
1290}