1use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::rc::{Rc, Weak};
15
16use columnar::Columnar;
17use differential_dataflow::Collection;
18use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc};
19use mz_expr::{EvalError, MapFilterProject, permutation_for_arrangement};
20use mz_ore::soft_assert_or_log;
21use mz_ore::str::StrExt;
22use mz_ore::vec::PartialOrdVecExt;
23use mz_repr::{Diff, GlobalId, Row, Timestamp};
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::errors::DataflowError;
26use mz_timely_util::operator::CollectionExt;
27use mz_timely_util::probe::Handle;
28use timely::container::CapacityContainerBuilder;
29use timely::dataflow::Scope;
30use timely::dataflow::scopes::Child;
31use timely::progress::Antichain;
32
33use crate::compute_state::SinkToken;
34use crate::logging::compute::LogDataflowErrors;
35use crate::render::context::Context;
36use crate::render::{RenderTimestamp, StartSignal};
37
38impl<'g, G, T> Context<Child<'g, G, T>>
39where
40 G: Scope<Timestamp = mz_repr::Timestamp>,
41 T: RenderTimestamp,
42 <T as Columnar>::Container: Clone + Send,
43 for<'a> <T as Columnar>::Ref<'a>: Ord + Copy,
44{
45 pub(crate) fn export_sink(
47 &self,
48 compute_state: &mut crate::compute_state::ComputeState,
49 tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
50 dependency_ids: BTreeSet<GlobalId>,
51 sink_id: GlobalId,
52 sink: &ComputeSinkDesc<CollectionMetadata>,
53 start_signal: StartSignal,
54 ct_times: Option<Collection<G, (), Diff>>,
55 output_probe: &Handle<Timestamp>,
56 ) {
57 soft_assert_or_log!(
58 sink.non_null_assertions.is_strictly_sorted(),
59 "non-null assertions not sorted"
60 );
61
62 let mut needed_tokens = Vec::new();
64 for dep_id in dependency_ids {
65 if let Some(token) = tokens.get(&dep_id) {
66 needed_tokens.push(Rc::clone(token))
67 }
68 }
69
70 let bundle = self
75 .lookup_id(mz_expr::Id::Global(sink.from))
76 .expect("Sink source collection not loaded");
77 let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection {
78 collection.clone()
79 } else {
80 let (key, _arrangement) = bundle
81 .arranged
82 .iter()
83 .next()
84 .expect("Invariant violated: at least one collection must be present.");
85 let unthinned_arity = sink.from_desc.arity();
86 let (permutation, thinning) = permutation_for_arrangement(key, unthinned_arity);
87 let mut mfp = MapFilterProject::new(unthinned_arity);
88 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
89 bundle.as_collection_core(
90 mfp,
91 Some((key.clone(), None)),
92 self.until.clone(),
93 &self.config_set,
94 )
95 };
96
97 if let Some(logger) = compute_state.compute_logger.clone() {
99 err_collection = err_collection.log_dataflow_errors(logger, sink_id);
100 }
101
102 let mut ok_collection = ok_collection.leave();
103 let mut err_collection = err_collection.leave();
104
105 if let Some(&expiration) = self.dataflow_expiration.as_option() {
108 let token = Rc::new(());
109 let shutdown_token = Rc::downgrade(&token);
110 ok_collection = ok_collection.expire_collection_at(
111 &format!("{}_export_sink_oks", self.debug_name),
112 expiration,
113 Weak::clone(&shutdown_token),
114 );
115 err_collection = err_collection.expire_collection_at(
116 &format!("{}_export_sink_errs", self.debug_name),
117 expiration,
118 shutdown_token,
119 );
120 needed_tokens.push(token);
121 }
122
123 let non_null_assertions = sink.non_null_assertions.clone();
124 let from_desc = sink.from_desc.clone();
125 if !non_null_assertions.is_empty() {
126 let name = format!("NullAssertions({sink_id:?})");
127 type CB<C> = CapacityContainerBuilder<C>;
128 let (oks, null_errs) =
129 ok_collection.map_fallible::<CB<_>, CB<_>, _, _, _>(&name, move |row| {
130 let mut idx = 0;
131 let mut iter = row.iter();
132 for &i in &non_null_assertions {
133 let skip = i - idx;
134 let datum = iter.nth(skip).unwrap();
135 idx += skip + 1;
136 if datum.is_null() {
137 return Err(DataflowError::EvalError(Box::new(
138 EvalError::MustNotBeNull(
139 format!("column {}", from_desc.get_name(i).as_str().quoted())
140 .into(),
141 ),
142 )));
143 }
144 }
145 Ok(row)
146 });
147 ok_collection = oks;
148 err_collection = err_collection.concat(&null_errs);
149 }
150
151 let region_name = match sink.connection {
152 ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id),
153 ComputeSinkConnection::MaterializedView(_) => {
154 format!("MaterializedViewSink({:?})", sink_id)
155 }
156 ComputeSinkConnection::ContinualTask(_) => {
157 format!("ContinualTaskSink({:?})", sink_id)
158 }
159 ComputeSinkConnection::CopyToS3Oneshot(_) => {
160 format!("CopyToS3OneshotSink({:?})", sink_id)
161 }
162 };
163 self.scope
164 .parent
165 .clone()
166 .region_named(®ion_name, |inner| {
167 let sink_render = get_sink_render_for::<_>(&sink.connection);
168
169 let sink_token = sink_render.render_sink(
170 compute_state,
171 sink,
172 sink_id,
173 self.as_of_frontier.clone(),
174 start_signal,
175 ok_collection.enter_region(inner),
176 err_collection.enter_region(inner),
177 ct_times.map(|x| x.enter_region(inner)),
178 output_probe,
179 );
180
181 if let Some(sink_token) = sink_token {
182 needed_tokens.push(sink_token);
183 }
184
185 let collection = compute_state.expect_collection_mut(sink_id);
186 collection.sink_token = Some(SinkToken::new(Box::new(needed_tokens)));
187 });
188 }
189}
190
191pub(crate) trait SinkRender<G>
193where
194 G: Scope<Timestamp = mz_repr::Timestamp>,
195{
196 fn render_sink(
197 &self,
198 compute_state: &mut crate::compute_state::ComputeState,
199 sink: &ComputeSinkDesc<CollectionMetadata>,
200 sink_id: GlobalId,
201 as_of: Antichain<mz_repr::Timestamp>,
202 start_signal: StartSignal,
203 sinked_collection: Collection<G, Row, Diff>,
204 err_collection: Collection<G, DataflowError, Diff>,
205 ct_times: Option<Collection<G, (), Diff>>,
208 output_probe: &Handle<Timestamp>,
209 ) -> Option<Rc<dyn Any>>;
210}
211
212fn get_sink_render_for<G>(
213 connection: &ComputeSinkConnection<CollectionMetadata>,
214) -> Box<dyn SinkRender<G>>
215where
216 G: Scope<Timestamp = mz_repr::Timestamp>,
217{
218 match connection {
219 ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()),
220 ComputeSinkConnection::MaterializedView(connection) => Box::new(connection.clone()),
221 ComputeSinkConnection::ContinualTask(connection) => Box::new(connection.clone()),
222 ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()),
223 }
224}