mz_storage/render.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders ingestions and exports into timely dataflow
11//!
12//! ## Ingestions
13//!
14//! ### Overall structure
15//!
16//! Before describing any of the timely operators involved in ingesting a source it helps to
17//! understand the high level structure of the timely scopes involved. The reason for this
18//! structure is the fact that we ingest external sources with a source-specific, and source
19//! implementation defined, timestamp type which tracks progress in a way that the source
20//! implementation understands. Each source specific timestamp must be compatible with timely's
21//! `timely::progress::Timestamp` trait and so it's suitable to represent timely streams and by
22//! extension differential collections.
23//!
24//! On the other hand, Materialize expects a specific timestamp type for all its collections
25//! (currently `mz_repr::Timestamp`) so at some point the dataflow's timestamp must change. More
26//! generally, the ingestion dataflow starts with some timestamp type `FromTime` and ends with
27//! another timestamp type `IntoTime`.
28//!
29//! Here we run into a problem though because we want to start with a timely stream of type
30//! `Stream<G1: Scope<Timestamp=FromTime>, ..>` and end up using it in a scope `G2` whose timestamp
31//! type is `IntoTime`. Timely dataflows are organized in scopes where each scope has an associated
32//! timestamp type that must refine the timestamp type of its parent scope. What "refines" means is
33//! defined by the [`timely::progress::timestamp::Refines`] trait in timely. `FromTime` however
34//! does not refine `IntoTime` nor does `IntoTime` refine `FromTime`.
35//!
36//! In order to acomplish this we split ingestion dataflows in two scopes, both of which are
37//! children of the root timely scope. The first scope is timestamped with `FromTime` and the
38//! second one with `IntoTime`. To move timely streams from the one scope to the other we must do
39//! so manually. Each stream that needs to be transferred between scopes is first captured using
40//! [`timely::dataflow::operators::capture::capture::Capture`] into a tokio unbounded mpsc channel.
41//! The data in the channel record in full detail the worker-local view of the original stream and
42//! whoever controls the receiver can read in the events, in the standard way of consuming the
43//! async channel, and work with it. How the receiver is turned back into a timely stream in the
44//! destination scope is described in the next section.
45//!
46//! For now keep in mind the general structure of the dataflow:
47//!
48//!
49//! ```text
50//! +----------------RootScope(Timestamp=())------------------+
51//! | |
52//! | +---FromTime Scope---+ +---IntoTime Scope--+ | |
53//! | | | | | |
54//! | | *--+---------+--> | |
55//! | | | | | |
56//! | | <--+---------+--* | |
57//! | +--------------------+ ^ +-------------------+ |
58//! | | |
59//! | | |
60//! | data exchanged between |
61//! | scopes with capture/reclock |
62//! +---------------------------------------------------------+
63//! ```
64//!
65//! ### Detailed dataflow
66//!
67//! We are now ready to describe the detailed structure of the ingestion dataflow. The dataflow
68//! begins with the `source reader` dataflow fragment which is rendered in a `FromTime` timely
69//! scope. This scope's timestamp is controlled by the [`crate::source::types::SourceRender::Time`]
70//! associated type and can be anything the source implementation desires.
71//!
72//! Each source is free to render any arbitrary dataflow fragment in that scope as long as it
73//! produces the collections expected by the rest of the framework. The rendering is handled by the
74//! `[crate::source::types::SourceRender::render] method.
75//!
76//! When rendering a source dataflow we expect three outputs. First, a health output, which is how
77//! the source communicates status updates about its health. Second, a data output, which is the
78//! main output of a source and contains the data that will eventually be recorded in the persist
79//! shard. Finally, an optional upper frontier output, which tracks the overall upstream upper
80//! frontier. When a source doesn't provide a dedicated progress output the framework derives one
81//! by observing the progress of the data output. This output (derived or not) is what drives
82//! reclocking. When a source provides a dedicated upper output, it can manage it independently of
83//! the data output frontier. For example, it's possible that a source implementation queries the
84//! upstream system to learn what are the latest offsets for and set the upper output based on
85//! that, even before having started the actual ingestion, which would be presented as data and
86//! progress trickling in via the data output.
87//!
88//! ```text
89//! resume upper
90//! ,--------------------.
91//! / |
92//! health ,----+---. |
93//! output | source | |
94//! ,-----------| reader | |
95//! / +--,---.-+ |
96//! / / \ |
97//! +-----/----+ data / \ upper |
98//! | health | output/ \ output |
99//! | operator | | \ |
100//! +----------+ | | |
101//! FromTime | | |
102//! scope | | |
103//! -------------------------------------|-----------|---------------|---
104//! IntoTime | | |
105//! scope | ,----+-----. |
106//! | | remap | |
107//! | | operator | |
108//! | +---,------+ |
109//! | / |
110//! | / bindings |
111//! | / |
112//! ,-+-----+--. |
113//! | reclock | |
114//! | operator | |
115//! +-,--,---.-+ |
116//! ,----------´.-´ \ |
117//! _.-´ .-´ \ |
118//! _.-´ .-´ \ |
119//! .-´ ,´ \ |
120//! / / \ |
121//! ,----------. ,----------. ,----------. |
122//! | decode | | decode | .... | decode | |
123//! | output 0 | | output 1 | | output N | |
124//! +-----+----+ +-----+----+ +-----+----+ |
125//! | | | |
126//! | | | |
127//! ,-----+----. ,-----+----. ,-----+----. |
128//! | envelope | | envelope | .... | envelope | |
129//! | output 0 | | output 1 | | output N | |
130//! +----------+ +-----+----+ +-----+----+ |
131//! | | | |
132//! | | | |
133//! ,-----+----. ,-----+----. ,-----+----. |
134//! | persist | | persist | .... | persist | |
135//! | sink 0 | | sink 1 | | sink N | |
136//! +-----+----+ +-----+----+ +-----+----+ |
137//! \ \ / |
138//! `-. `, / |
139//! `-._ `-. / |
140//! `-._ `-. / |
141//! `---------. `-. / |
142//! +`---`---+---, |
143//! | resume | |
144//! | calculator | |
145//! +------+-----+ |
146//! \ |
147//! `-------------------´
148//! ```
149//!
150//! #### Reclocking
151//!
152//! Whenever a dataflow edge crosses the scope boundaries it must first be converted into a
153//! captured stream via the `[mz_timely_util::capture::UnboundedTokioCapture`] utility. This
154//! disassociates the stream and its progress information from the original timely scope and allows
155//! it to be read from a different place. The downside of this mechanism is that it's invisible to
156//! timely's progress tracking, but that seems like a necessary evil if we want to do reclocking.
157//!
158//! The two main ways these tokio-fied streams are turned back into normal timely streams in the
159//! destination scope are by the `reclock operator` and the `remap operator` which process the
160//! `data output` and `upper output` of the source reader respectively.
161//!
162//! The `remap operator` reads the `upper output`, which is composed only of frontiers, mints new
163//! bindings, and writes them into the remap shard. The final durable timestamp bindings are
164//! emitted as its output for consumption by the `reclock operator`.
165//!
166//! The `reclock operator` reads the `data output`, which contains both data and progress
167//! statements, and uses the bindings it receives from the `remap operator` to reclock each piece
168//! of data and each frontier statement into the target scope's timestamp and emit the reclocked
169//! stream in its output.
170//!
171//! #### Partitioning
172//!
173//! At this point we have a timely stream with correctly timestamped data in the mz time domain
174//! (`mz_repr::Timestamp`) which contains multiplexed messages for each of the potential subsources
175//! of this source. Each message selects the output it belongs to by setting the output field in
176//! [`crate::source::types::SourceMessage`]. By convention, the main source output is always output
177//! zero and subsources get the outputs from one onwards.
178//!
179//! However, regardless of whether the output is the main source or a subsource it is treated
180//! identically by the pipeline. Each output is demultiplexed into its own timely stream using
181//! [`timely::dataflow::operators::partition::Partition`] and the rest of the ingestion pipeline is
182//! rendered independently.
183//!
184//! #### Resumption frontier
185//!
186//! At the end of each per-output dataflow fragment is an instance of `persist_sink`, which is
187//! responsible for writing the final `Row` data into the corresponding output shard. The durable
188//! upper of each of the output shards is then recombined in a way that calculates the minimum
189//! upper frontier between them. This is what we refer to as the "resumption frontier" or "resume
190//! upper" and at this stage it is expressed in terms of `IntoTime` timestamps. As a final step,
191//! this resumption frontier is converted back into a `FromTime` timestamped frontier using
192//! `ReclockFollower::source_upper_at_frontier` and connected back to the source reader operator.
193//! This frontier is what drives the `OffsetCommiter` which informs the upstream system to release
194//! resources until the specified offsets.
195//!
196//! ## Exports
197//!
198//! Not yet documented
199
200use std::collections::BTreeMap;
201use std::rc::Rc;
202use std::sync::Arc;
203
204use mz_ore::error::ErrorExt;
205use mz_repr::{GlobalId, Row};
206use mz_storage_types::controller::CollectionMetadata;
207use mz_storage_types::dyncfgs;
208use mz_storage_types::oneshot_sources::{OneshotIngestionDescription, OneshotIngestionRequest};
209use mz_storage_types::sinks::StorageSinkDesc;
210use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceConnection};
211use mz_timely_util::antichain::AntichainExt;
212use timely::communication::Allocate;
213use timely::dataflow::Scope;
214use timely::dataflow::operators::{Concatenate, ConnectLoop, Feedback, Leave, Map};
215use timely::dataflow::scopes::Child;
216use timely::progress::Antichain;
217use timely::worker::{AsWorker, Worker as TimelyWorker};
218use tokio::sync::Semaphore;
219
220use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
221use crate::source::RawSourceCreationConfig;
222use crate::storage_state::StorageState;
223
224mod persist_sink;
225pub mod sinks;
226pub mod sources;
227
228/// Assemble the "ingestion" side of a dataflow, i.e. the sources.
229///
230/// This method creates a new dataflow to host the implementations of sources for the `dataflow`
231/// argument, and returns assets for each source that can import the results into a new dataflow.
232pub fn build_ingestion_dataflow<A: Allocate>(
233 timely_worker: &mut TimelyWorker<A>,
234 storage_state: &mut StorageState,
235 primary_source_id: GlobalId,
236 description: IngestionDescription<CollectionMetadata>,
237 as_of: Antichain<mz_repr::Timestamp>,
238 resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
239 source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
240) {
241 let worker_id = timely_worker.index();
242 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
243 let debug_name = primary_source_id.to_string();
244 let name = format!("Source dataflow: {debug_name}");
245 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, root_scope| {
246 let root_scope: &mut Child<_, ()> = root_scope;
247 // Here we need to create two scopes. One timestamped with `()`, which is the root scope,
248 // and one timestamped with `mz_repr::Timestamp` which is the final scope of the dataflow.
249 // Refer to the module documentation for an explanation of this structure.
250 // The scope.clone() occurs to allow import in the region.
251 root_scope.clone().scoped(&name, |mz_scope| {
252 let debug_name = format!("{debug_name}-sources");
253
254 let mut tokens = vec![];
255
256 let (feedback_handle, feedback) = mz_scope.feedback(Default::default());
257
258 let connection = description.desc.connection.clone();
259 tracing::info!(
260 id = %primary_source_id,
261 as_of = %as_of.pretty(),
262 resume_uppers = ?resume_uppers,
263 source_resume_uppers = ?source_resume_uppers,
264 "timely-{worker_id} building {} source pipeline", connection.name(),
265 );
266
267 let busy_signal = if dyncfgs::SUSPENDABLE_SOURCES
268 .get(storage_state.storage_configuration.config_set())
269 {
270 Arc::new(Semaphore::new(1))
271 } else {
272 Arc::new(Semaphore::new(Semaphore::MAX_PERMITS))
273 };
274
275 let base_source_config = RawSourceCreationConfig {
276 name: format!("{}-{}", connection.name(), primary_source_id),
277 id: primary_source_id,
278 source_exports: description.source_exports.clone(),
279 timestamp_interval: description.desc.timestamp_interval,
280 worker_id: mz_scope.index(),
281 worker_count: mz_scope.peers(),
282 now_fn: storage_state.now.clone(),
283 metrics: storage_state.metrics.clone(),
284 as_of: as_of.clone(),
285 resume_uppers: resume_uppers.clone(),
286 source_resume_uppers,
287 storage_metadata: description.ingestion_metadata.clone(),
288 persist_clients: Arc::clone(&storage_state.persist_clients),
289 source_statistics: storage_state
290 .aggregated_statistics
291 .get_source(&primary_source_id)
292 .expect("statistics initialized")
293 .clone(),
294 shared_remap_upper: Rc::clone(
295 &storage_state.source_uppers[&description.remap_collection_id],
296 ),
297 // This might quite a large clone, but its just during rendering
298 config: storage_state.storage_configuration.clone(),
299 remap_collection_id: description.remap_collection_id.clone(),
300 busy_signal: Arc::clone(&busy_signal),
301 };
302
303 let (outputs, source_health, source_tokens) = match connection {
304 GenericSourceConnection::Kafka(c) => crate::render::sources::render_source(
305 mz_scope,
306 &debug_name,
307 c,
308 description.clone(),
309 &feedback,
310 storage_state,
311 base_source_config,
312 ),
313 GenericSourceConnection::Postgres(c) => crate::render::sources::render_source(
314 mz_scope,
315 &debug_name,
316 c,
317 description.clone(),
318 &feedback,
319 storage_state,
320 base_source_config,
321 ),
322 GenericSourceConnection::MySql(c) => crate::render::sources::render_source(
323 mz_scope,
324 &debug_name,
325 c,
326 description.clone(),
327 &feedback,
328 storage_state,
329 base_source_config,
330 ),
331 GenericSourceConnection::SqlServer(c) => crate::render::sources::render_source(
332 mz_scope,
333 &debug_name,
334 c,
335 description.clone(),
336 &feedback,
337 storage_state,
338 base_source_config,
339 ),
340 GenericSourceConnection::LoadGenerator(c) => crate::render::sources::render_source(
341 mz_scope,
342 &debug_name,
343 c,
344 description.clone(),
345 &feedback,
346 storage_state,
347 base_source_config,
348 ),
349 };
350 tokens.extend(source_tokens);
351
352 let mut upper_streams = vec![];
353 let mut health_streams = Vec::with_capacity(source_health.len() + outputs.len());
354 health_streams.extend(source_health);
355 for (export_id, (ok, err)) in outputs {
356 let export = &description.source_exports[&export_id];
357 let source_data = ok.map(Ok).concat(&err.map(Err));
358
359 let metrics = storage_state.metrics.get_source_persist_sink_metrics(
360 export_id,
361 primary_source_id,
362 worker_id,
363 &export.storage_metadata.data_shard,
364 );
365
366 tracing::info!(
367 id = %primary_source_id,
368 "timely-{worker_id}: persisting export {} of {}",
369 export_id,
370 primary_source_id
371 );
372 let (upper_stream, errors, sink_tokens) = crate::render::persist_sink::render(
373 mz_scope,
374 export_id,
375 export.storage_metadata.clone(),
376 source_data,
377 storage_state,
378 metrics,
379 Arc::clone(&busy_signal),
380 );
381 upper_streams.push(upper_stream);
382 tokens.extend(sink_tokens);
383
384 let sink_health = errors.map(move |err: Rc<anyhow::Error>| {
385 let halt_status =
386 HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
387 HealthStatusMessage {
388 id: None,
389 namespace: StatusNamespace::Internal,
390 update: halt_status,
391 }
392 });
393 health_streams.push(sink_health.leave());
394 }
395
396 mz_scope
397 .concatenate(upper_streams)
398 .connect_loop(feedback_handle);
399
400 let health_stream = root_scope.concatenate(health_streams);
401 let health_token = crate::healthcheck::health_operator(
402 root_scope,
403 storage_state.now.clone(),
404 resume_uppers
405 .iter()
406 .filter_map(|(id, frontier)| {
407 // If the collection isn't closed, then we will remark it as Starting as
408 // the dataflow comes up.
409 (!frontier.is_empty()).then_some(*id)
410 })
411 .collect(),
412 primary_source_id,
413 "source",
414 &health_stream,
415 crate::healthcheck::DefaultWriter {
416 command_tx: storage_state.internal_cmd_tx.clone(),
417 updates: Rc::clone(&storage_state.shared_status_updates),
418 },
419 storage_state
420 .storage_configuration
421 .parameters
422 .record_namespaced_errors,
423 dyncfgs::STORAGE_SUSPEND_AND_RESTART_DELAY
424 .get(storage_state.storage_configuration.config_set()),
425 );
426 tokens.push(health_token);
427
428 storage_state
429 .source_tokens
430 .insert(primary_source_id, tokens);
431 })
432 });
433}
434
435/// do the export dataflow thing
436pub fn build_export_dataflow<A: Allocate>(
437 timely_worker: &mut TimelyWorker<A>,
438 storage_state: &mut StorageState,
439 id: GlobalId,
440 description: StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
441) {
442 let worker_logging = timely_worker.logger_for("timely").map(Into::into);
443 let debug_name = id.to_string();
444 let name = format!("Source dataflow: {debug_name}");
445 timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, root_scope| {
446 // The scope.clone() occurs to allow import in the region.
447 // We build a region here to establish a pattern of a scope inside the dataflow
448 // so that other similar uses (e.g. with iterative scopes) do not require weird
449 // alternate type signatures.
450 root_scope.region_named(&name, |scope| {
451 let mut tokens = vec![];
452 let (health_stream, sink_tokens) =
453 crate::render::sinks::render_sink(scope, storage_state, id, &description);
454 tokens.extend(sink_tokens);
455
456 // Note that sinks also have only 1 active worker, which simplifies the work that
457 // `health_operator` has to do internally.
458 let health_token = crate::healthcheck::health_operator(
459 scope,
460 storage_state.now.clone(),
461 [id].into_iter().collect(),
462 id,
463 "sink",
464 &health_stream,
465 crate::healthcheck::DefaultWriter {
466 command_tx: storage_state.internal_cmd_tx.clone(),
467 updates: Rc::clone(&storage_state.shared_status_updates),
468 },
469 storage_state
470 .storage_configuration
471 .parameters
472 .record_namespaced_errors,
473 dyncfgs::STORAGE_SUSPEND_AND_RESTART_DELAY
474 .get(storage_state.storage_configuration.config_set()),
475 );
476 tokens.push(health_token);
477
478 storage_state.sink_tokens.insert(id, tokens);
479 })
480 });
481}
482
483pub(crate) fn build_oneshot_ingestion_dataflow<A: Allocate>(
484 timely_worker: &mut TimelyWorker<A>,
485 storage_state: &mut StorageState,
486 ingestion_id: uuid::Uuid,
487 collection_id: GlobalId,
488 collection_meta: CollectionMetadata,
489 description: OneshotIngestionRequest,
490) {
491 let (results_tx, results_rx) = tokio::sync::mpsc::unbounded_channel();
492 let callback = move |result| {
493 // TODO(cf3): Do we care if the receiver has gone away?
494 //
495 // Persist is working on cleaning up leaked blobs, we could also use `OneshotReceiverExt`
496 // here, but that might run into the infamous async-Drop problem.
497 let _ = results_tx.send(result);
498 };
499 let connection_context = storage_state
500 .storage_configuration
501 .connection_context
502 .clone();
503
504 let tokens = timely_worker.dataflow(|scope| {
505 mz_storage_operators::oneshot_source::render(
506 scope.clone(),
507 Arc::clone(&storage_state.persist_clients),
508 connection_context,
509 collection_id,
510 collection_meta,
511 description,
512 callback,
513 )
514 });
515 let ingestion_description = OneshotIngestionDescription {
516 tokens,
517 results: results_rx,
518 };
519
520 storage_state
521 .oneshot_ingestions
522 .insert(ingestion_id, ingestion_description);
523}