1use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::rc::{Rc, Weak};
15
16use differential_dataflow::Collection;
17use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc};
18use mz_expr::{EvalError, MapFilterProject, permutation_for_arrangement};
19use mz_ore::soft_assert_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::vec::PartialOrdVecExt;
22use mz_repr::{Diff, GlobalId, Row, Timestamp};
23use mz_storage_types::controller::CollectionMetadata;
24use mz_storage_types::errors::DataflowError;
25use mz_timely_util::operator::CollectionExt;
26use mz_timely_util::probe::Handle;
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::Scope;
29use timely::dataflow::scopes::Child;
30use timely::progress::Antichain;
31
32use crate::compute_state::SinkToken;
33use crate::logging::compute::LogDataflowErrors;
34use crate::render::context::Context;
35use crate::render::{RenderTimestamp, StartSignal};
36
37impl<'g, G, T> Context<Child<'g, G, T>>
38where
39 G: Scope<Timestamp = mz_repr::Timestamp>,
40 T: RenderTimestamp,
41{
42 pub(crate) fn export_sink(
44 &self,
45 compute_state: &mut crate::compute_state::ComputeState,
46 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
47 dependency_ids: BTreeSet<GlobalId>,
48 sink_id: GlobalId,
49 sink: &ComputeSinkDesc<CollectionMetadata>,
50 start_signal: StartSignal,
51 ct_times: Option<Collection<G, (), Diff>>,
52 output_probe: &Handle<Timestamp>,
53 ) {
54 soft_assert_or_log!(
55 sink.non_null_assertions.is_strictly_sorted(),
56 "non-null assertions not sorted"
57 );
58
59 let mut needed_tokens = Vec::new();
61 for dep_id in dependency_ids {
62 if let Some(token) = tokens.get(&dep_id) {
63 needed_tokens.push(Rc::clone(token))
64 }
65 }
66
67 let bundle = self
72 .lookup_id(mz_expr::Id::Global(sink.from))
73 .expect("Sink source collection not loaded");
74 let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection {
75 collection.clone()
76 } else {
77 let (key, _arrangement) = bundle
78 .arranged
79 .iter()
80 .next()
81 .expect("Invariant violated: at least one collection must be present.");
82 let unthinned_arity = sink.from_desc.arity();
83 let (permutation, thinning) = permutation_for_arrangement(key, unthinned_arity);
84 let mut mfp = MapFilterProject::new(unthinned_arity);
85 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
86 bundle.as_collection_core(
87 mfp,
88 Some((key.clone(), None)),
89 self.until.clone(),
90 &self.config_set,
91 )
92 };
93
94 if let Some(logger) = compute_state.compute_logger.clone() {
96 err_collection = err_collection.log_dataflow_errors(logger, sink_id);
97 }
98
99 let mut ok_collection = ok_collection.leave();
100 let mut err_collection = err_collection.leave();
101
102 if let Some(&expiration) = self.dataflow_expiration.as_option() {
105 let token = Rc::new(());
106 let shutdown_token = Rc::downgrade(&token);
107 ok_collection = ok_collection.expire_collection_at(
108 &format!("{}_export_sink_oks", self.debug_name),
109 expiration,
110 Weak::clone(&shutdown_token),
111 );
112 err_collection = err_collection.expire_collection_at(
113 &format!("{}_export_sink_errs", self.debug_name),
114 expiration,
115 shutdown_token,
116 );
117 needed_tokens.push(token);
118 }
119
120 let non_null_assertions = sink.non_null_assertions.clone();
121 let from_desc = sink.from_desc.clone();
122 if !non_null_assertions.is_empty() {
123 let name = format!("NullAssertions({sink_id:?})");
124 type CB<C> = CapacityContainerBuilder<C>;
125 let (oks, null_errs) =
126 ok_collection.map_fallible::<CB<_>, CB<_>, _, _, _>(&name, move |row| {
127 let mut idx = 0;
128 let mut iter = row.iter();
129 for &i in &non_null_assertions {
130 let skip = i - idx;
131 let datum = iter.nth(skip).unwrap();
132 idx += skip + 1;
133 if datum.is_null() {
134 return Err(DataflowError::EvalError(Box::new(
135 EvalError::MustNotBeNull(
136 format!("column {}", from_desc.get_name(i).quoted()).into(),
137 ),
138 )));
139 }
140 }
141 Ok(row)
142 });
143 ok_collection = oks;
144 err_collection = err_collection.concat(&null_errs);
145 }
146
147 let region_name = match sink.connection {
148 ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id),
149 ComputeSinkConnection::MaterializedView(_) => {
150 format!("MaterializedViewSink({:?})", sink_id)
151 }
152 ComputeSinkConnection::ContinualTask(_) => {
153 format!("ContinualTaskSink({:?})", sink_id)
154 }
155 ComputeSinkConnection::CopyToS3Oneshot(_) => {
156 format!("CopyToS3OneshotSink({:?})", sink_id)
157 }
158 };
159 self.scope
160 .parent
161 .clone()
162 .region_named(®ion_name, |inner| {
163 let sink_render = get_sink_render_for::<_>(&sink.connection);
164
165 let sink_token = sink_render.render_sink(
166 compute_state,
167 sink,
168 sink_id,
169 self.as_of_frontier.clone(),
170 start_signal,
171 ok_collection.enter_region(inner),
172 err_collection.enter_region(inner),
173 ct_times.map(|x| x.enter_region(inner)),
174 output_probe,
175 );
176
177 if let Some(sink_token) = sink_token {
178 needed_tokens.push(sink_token);
179 }
180
181 let collection = compute_state.expect_collection_mut(sink_id);
182 collection.sink_token = Some(SinkToken::new(Box::new(needed_tokens)));
183 });
184 }
185}
186
187pub(crate) trait SinkRender<G>
189where
190 G: Scope<Timestamp = mz_repr::Timestamp>,
191{
192 fn render_sink(
193 &self,
194 compute_state: &mut crate::compute_state::ComputeState,
195 sink: &ComputeSinkDesc<CollectionMetadata>,
196 sink_id: GlobalId,
197 as_of: Antichain<mz_repr::Timestamp>,
198 start_signal: StartSignal,
199 sinked_collection: Collection<G, Row, Diff>,
200 err_collection: Collection<G, DataflowError, Diff>,
201 ct_times: Option<Collection<G, (), Diff>>,
204 output_probe: &Handle<Timestamp>,
205 ) -> Option<Rc<dyn Any>>;
206}
207
208fn get_sink_render_for<G>(
209 connection: &ComputeSinkConnection<CollectionMetadata>,
210) -> Box<dyn SinkRender<G>>
211where
212 G: Scope<Timestamp = mz_repr::Timestamp>,
213{
214 match connection {
215 ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()),
216 ComputeSinkConnection::MaterializedView(connection) => Box::new(connection.clone()),
217 ComputeSinkConnection::ContinualTask(connection) => Box::new(connection.clone()),
218 ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()),
219 }
220}