1use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::rc::Rc;
15
16use differential_dataflow::VecCollection;
17use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc};
18use mz_expr::{EvalError, MapFilterProject, permutation_for_arrangement};
19use mz_ore::soft_assert_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::vec::PartialOrdVecExt;
22use mz_repr::{Diff, GlobalId, Row};
23use mz_storage_types::controller::CollectionMetadata;
24use mz_storage_types::errors::DataflowError;
25use mz_timely_util::operator::CollectionExt;
26use mz_timely_util::probe::Handle;
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::Scope;
29use timely::progress::Antichain;
30
31use crate::compute_state::SinkToken;
32use crate::logging::compute::LogDataflowErrors;
33use crate::render::context::Context;
34use crate::render::{RenderTimestamp, StartSignal};
35
36impl<'g, T: RenderTimestamp> Context<'g, T> {
37 pub(crate) fn export_sink(
39 &self,
40 compute_state: &mut crate::compute_state::ComputeState,
41 tokens: &BTreeMap<GlobalId, Rc<dyn Any>>,
42 dependency_ids: BTreeSet<GlobalId>,
43 sink_id: GlobalId,
44 sink: &ComputeSinkDesc<CollectionMetadata>,
45 start_signal: StartSignal,
46 ct_times: Option<VecCollection<'g, mz_repr::Timestamp, (), Diff>>,
47 output_probe: &Handle<mz_repr::Timestamp>,
48 outer_scope: Scope<'g, mz_repr::Timestamp>,
49 ) {
50 soft_assert_or_log!(
51 sink.non_null_assertions.is_strictly_sorted(),
52 "non-null assertions not sorted"
53 );
54
55 let mut needed_tokens = Vec::new();
57 for dep_id in dependency_ids {
58 if let Some(token) = tokens.get(&dep_id) {
59 needed_tokens.push(Rc::clone(token))
60 }
61 }
62
63 let bundle = self
68 .lookup_id(mz_expr::Id::Global(sink.from))
69 .expect("Sink source collection not loaded");
70 let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection {
71 collection.clone()
72 } else {
73 let (key, _arrangement) = bundle
74 .arranged
75 .iter()
76 .next()
77 .expect("Invariant violated: at least one collection must be present.");
78 let unthinned_arity = sink.from_desc.arity();
79 let (permutation, thinning) = permutation_for_arrangement(key, unthinned_arity);
80 let mut mfp = MapFilterProject::new(unthinned_arity);
81 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
82 bundle.as_collection_core(
83 mfp,
84 Some((key.clone(), None)),
85 self.until.clone(),
86 &self.config_set,
87 )
88 };
89
90 if let Some(logger) = compute_state.compute_logger.clone() {
92 err_collection = err_collection.log_dataflow_errors(logger, sink_id);
93 }
94
95 let mut ok_collection = ok_collection.leave(outer_scope);
96 let mut err_collection = err_collection.leave(outer_scope);
97
98 if let Some(&expiration) = self.dataflow_expiration.as_option() {
101 ok_collection = ok_collection
102 .expire_collection_at(&format!("{}_export_sink_oks", self.debug_name), expiration);
103 err_collection = err_collection
104 .expire_collection_at(&format!("{}_export_sink_errs", self.debug_name), expiration);
105 }
106
107 let non_null_assertions = sink.non_null_assertions.clone();
108 let from_desc = sink.from_desc.clone();
109 if !non_null_assertions.is_empty() {
110 let name = format!("NullAssertions({sink_id:?})");
111 type CB<C> = CapacityContainerBuilder<C>;
112 let (oks, null_errs) =
113 ok_collection.map_fallible::<CB<_>, CB<_>, _, _, _>(&name, move |row| {
114 let mut idx = 0;
115 let mut iter = row.iter();
116 for &i in &non_null_assertions {
117 let skip = i - idx;
118 let datum = iter.nth(skip).unwrap();
119 idx += skip + 1;
120 if datum.is_null() {
121 return Err(DataflowError::EvalError(Box::new(
122 EvalError::MustNotBeNull(
123 format!("column {}", from_desc.get_name(i).quoted()).into(),
124 ),
125 )));
126 }
127 }
128 Ok(row)
129 });
130 ok_collection = oks;
131 err_collection = err_collection.concat(null_errs);
132 }
133
134 let region_name = match sink.connection {
135 ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id),
136 ComputeSinkConnection::MaterializedView(_) => {
137 format!("MaterializedViewSink({:?})", sink_id)
138 }
139 ComputeSinkConnection::ContinualTask(_) => {
140 format!("ContinualTaskSink({:?})", sink_id)
141 }
142 ComputeSinkConnection::CopyToS3Oneshot(_) => {
143 format!("CopyToS3OneshotSink({:?})", sink_id)
144 }
145 };
146 outer_scope.clone().region_named(®ion_name, |inner| {
147 let sink_render = get_sink_render_for(&sink.connection);
148
149 let sink_token = sink_render.render_sink(
150 compute_state,
151 sink,
152 sink_id,
153 self.as_of_frontier.clone(),
154 start_signal,
155 ok_collection.enter_region(inner),
156 err_collection.enter_region(inner),
157 ct_times.map(|x| x.enter_region(inner)),
158 output_probe,
159 );
160
161 if let Some(sink_token) = sink_token {
162 needed_tokens.push(sink_token);
163 }
164
165 let collection = compute_state.expect_collection_mut(sink_id);
166 collection.sink_token = Some(SinkToken::new(Box::new(needed_tokens)));
167 });
168 }
169}
170
171pub(crate) trait SinkRender<'scope> {
173 fn render_sink(
174 &self,
175 compute_state: &mut crate::compute_state::ComputeState,
176 sink: &ComputeSinkDesc<CollectionMetadata>,
177 sink_id: GlobalId,
178 as_of: Antichain<mz_repr::Timestamp>,
179 start_signal: StartSignal,
180 sinked_collection: VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
181 err_collection: VecCollection<'scope, mz_repr::Timestamp, DataflowError, Diff>,
182 ct_times: Option<VecCollection<'scope, mz_repr::Timestamp, (), Diff>>,
185 output_probe: &Handle<mz_repr::Timestamp>,
186 ) -> Option<Rc<dyn Any>>;
187}
188
189fn get_sink_render_for<'scope>(
190 connection: &ComputeSinkConnection<CollectionMetadata>,
191) -> Box<dyn SinkRender<'scope>> {
192 match connection {
193 ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()),
194 ComputeSinkConnection::MaterializedView(connection) => Box::new(connection.clone()),
195 ComputeSinkConnection::ContinualTask(connection) => Box::new(connection.clone()),
196 ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()),
197 }
198}