mz_compute/render/
sinks.rs1use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::rc::Rc;
15
16use differential_dataflow::VecCollection;
17use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc};
18use mz_expr::{EvalError, MapFilterProject, permutation_for_arrangement};
19use mz_ore::soft_assert_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::vec::PartialOrdVecExt;
22use mz_repr::{Diff, GlobalId, Row};
23use mz_storage_types::controller::CollectionMetadata;
24use mz_timely_util::operator::CollectionExt;
25use mz_timely_util::probe::Handle;
26use timely::container::CapacityContainerBuilder;
27use timely::dataflow::Scope;
28use timely::progress::Antichain;
29
30use crate::compute_state::SinkToken;
31use crate::logging::compute::LogDataflowErrors;
32use crate::render::context::Context;
33use crate::render::errors::DataflowErrorSer;
34use crate::render::{RenderTimestamp, StartSignal};
35
36impl<'g, T: RenderTimestamp> Context<'g, T> {
37 pub(crate) fn export_sink(
39 &self,
40 compute_state: &mut crate::compute_state::ComputeState,
41 tokens: &BTreeMap<GlobalId, Rc<dyn Any>>,
42 dependency_ids: BTreeSet<GlobalId>,
43 sink_id: GlobalId,
44 sink: &ComputeSinkDesc<CollectionMetadata>,
45 start_signal: StartSignal,
46 output_probe: &Handle<mz_repr::Timestamp>,
47 outer_scope: Scope<'g, mz_repr::Timestamp>,
48 ) {
49 soft_assert_or_log!(
50 sink.non_null_assertions.is_strictly_sorted(),
51 "non-null assertions not sorted"
52 );
53
54 let mut needed_tokens = Vec::new();
56 for dep_id in dependency_ids {
57 if let Some(token) = tokens.get(&dep_id) {
58 needed_tokens.push(Rc::clone(token))
59 }
60 }
61
62 let bundle = self
67 .lookup_id(mz_expr::Id::Global(sink.from))
68 .expect("Sink source collection not loaded");
69 let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection {
70 collection.clone()
71 } else {
72 let (key, _arrangement) = bundle
73 .arranged
74 .iter()
75 .next()
76 .expect("Invariant violated: at least one collection must be present.");
77 let unthinned_arity = sink.from_desc.arity();
78 let (permutation, thinning) = permutation_for_arrangement(key, unthinned_arity);
79 let mut mfp = MapFilterProject::new(unthinned_arity);
80 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
81 bundle.as_collection_core(
82 mfp,
83 Some((key.clone(), None)),
84 self.until.clone(),
85 &self.config_set,
86 )
87 };
88
89 if let Some(logger) = compute_state.compute_logger.clone() {
91 err_collection = err_collection.log_dataflow_errors(logger, sink_id);
92 }
93
94 let mut ok_collection = ok_collection.leave(outer_scope);
95 let mut err_collection = err_collection.leave(outer_scope);
96
97 if let Some(&expiration) = self.dataflow_expiration.as_option() {
100 ok_collection = ok_collection
101 .expire_collection_at(&format!("{}_export_sink_oks", self.debug_name), expiration);
102 err_collection = err_collection
103 .expire_collection_at(&format!("{}_export_sink_errs", self.debug_name), expiration);
104 }
105
106 let non_null_assertions = sink.non_null_assertions.clone();
107 let from_desc = sink.from_desc.clone();
108 if !non_null_assertions.is_empty() {
109 let name = format!("NullAssertions({sink_id:?})");
110 type CB<C> = CapacityContainerBuilder<C>;
111 let (oks, null_errs) =
112 ok_collection.map_fallible::<CB<_>, CB<_>, _, _, _>(&name, move |row| {
113 let mut idx = 0;
114 let mut iter = row.iter();
115 for &i in &non_null_assertions {
116 let skip = i - idx;
117 let datum = iter.nth(skip).unwrap();
118 idx += skip + 1;
119 if datum.is_null() {
120 return Err(DataflowErrorSer::from(EvalError::MustNotBeNull(
121 format!("column {}", from_desc.get_name(i).quoted()).into(),
122 )));
123 }
124 }
125 Ok(row)
126 });
127 ok_collection = oks;
128 err_collection = err_collection.concat(null_errs);
129 }
130
131 let region_name = match sink.connection {
132 ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id),
133 ComputeSinkConnection::MaterializedView(_) => {
134 format!("MaterializedViewSink({:?})", sink_id)
135 }
136 ComputeSinkConnection::CopyToS3Oneshot(_) => {
137 format!("CopyToS3OneshotSink({:?})", sink_id)
138 }
139 };
140 outer_scope.clone().region_named(®ion_name, |inner| {
141 let sink_render = get_sink_render_for(&sink.connection);
142
143 let sink_token = sink_render.render_sink(
144 compute_state,
145 sink,
146 sink_id,
147 self.as_of_frontier.clone(),
148 start_signal,
149 ok_collection.enter_region(inner),
150 err_collection.enter_region(inner),
151 output_probe,
152 );
153
154 if let Some(sink_token) = sink_token {
155 needed_tokens.push(sink_token);
156 }
157
158 let collection = compute_state.expect_collection_mut(sink_id);
159 collection.sink_token = Some(SinkToken::new(Box::new(needed_tokens)));
160 });
161 }
162}
163
164pub(crate) trait SinkRender<'scope> {
166 fn render_sink(
167 &self,
168 compute_state: &mut crate::compute_state::ComputeState,
169 sink: &ComputeSinkDesc<CollectionMetadata>,
170 sink_id: GlobalId,
171 as_of: Antichain<mz_repr::Timestamp>,
172 start_signal: StartSignal,
173 sinked_collection: VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
174 err_collection: VecCollection<'scope, mz_repr::Timestamp, DataflowErrorSer, Diff>,
175 output_probe: &Handle<mz_repr::Timestamp>,
176 ) -> Option<Rc<dyn Any>>;
177}
178
179fn get_sink_render_for<'scope>(
180 connection: &ComputeSinkConnection<CollectionMetadata>,
181) -> Box<dyn SinkRender<'scope>> {
182 match connection {
183 ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()),
184 ComputeSinkConnection::MaterializedView(connection) => Box::new(connection.clone()),
185 ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()),
186 }
187}