Skip to main content

mz_storage/
render.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders ingestions and exports into timely dataflow
11//!
12//! ## Ingestions
13//!
14//! ### Overall structure
15//!
16//! Before describing any of the timely operators involved in ingesting a source it helps to
17//! understand the high level structure of the timely scopes involved. The reason for this
18//! structure is the fact that we ingest external sources with a source-specific, and source
19//! implementation defined, timestamp type which tracks progress in a way that the source
20//! implementation understands. Each source specific timestamp must be compatible with timely's
21//! `timely::progress::Timestamp` trait and so it's suitable to represent timely streams and by
22//! extension differential collections.
23//!
24//! On the other hand, Materialize expects a specific timestamp type for all its collections
25//! (currently `mz_repr::Timestamp`) so at some point the dataflow's timestamp must change. More
26//! generally, the ingestion dataflow starts with some timestamp type `FromTime` and ends with
27//! another timestamp type `IntoTime`.
28//!
29//! Here we run into a problem though because we want to start with a timely stream of type
30//! `Stream<G1: Scope<Timestamp=FromTime>, ..>` and end up using it in a scope `G2` whose timestamp
31//! type is `IntoTime`. Timely dataflows are organized in scopes where each scope has an associated
32//! timestamp type that must refine the timestamp type of its parent scope. What "refines" means is
33//! defined by the [`timely::progress::timestamp::Refines`] trait in timely. `FromTime` however
34//! does not refine `IntoTime` nor does `IntoTime` refine `FromTime`.
35//!
36//! In order to acomplish this we split ingestion dataflows in two scopes, both of which are
37//! children of the root timely scope. The first scope is timestamped with `FromTime` and the
38//! second one with `IntoTime`. To move timely streams from the one scope to the other we must do
39//! so manually. Each stream that needs to be transferred between scopes is first captured using
40//! [`timely::dataflow::operators::capture::capture::Capture`] into a tokio unbounded mpsc channel.
41//! The data in the channel record in full detail the worker-local view of the original stream and
42//! whoever controls the receiver can read in the events, in the standard way of consuming the
43//! async channel, and work with it. How the receiver is turned back into a timely stream in the
44//! destination scope is described in the next section.
45//!
46//! For now keep in mind the general structure of the dataflow:
47//!
48//!
49//! ```text
50//! +----------------RootScope(Timestamp=())------------------+
51//! |                                                         |
52//! |  +---FromTime Scope---+         +---IntoTime Scope--+   |                                                   |
53//! |  |                    |         |                   |   |
54//! |  |                 *--+---------+-->                |   |
55//! |  |                    |         |                   |   |
56//! |  |                 <--+---------+--*                |   |
57//! |  +--------------------+    ^    +-------------------+   |
58//! |                            |                            |
59//! |                            |                            |
60//! |                  data exchanged between                 |
61//! |                 scopes with capture/reclock             |
62//! +---------------------------------------------------------+
63//! ```
64//!
65//! ### Detailed dataflow
66//!
67//! We are now ready to describe the detailed structure of the ingestion dataflow. The dataflow
68//! begins with the `source reader` dataflow fragment which is rendered in a `FromTime` timely
69//! scope. This scope's timestamp is controlled by the [`crate::source::types::SourceRender::Time`]
70//! associated type and can be anything the source implementation desires.
71//!
72//! Each source is free to render any arbitrary dataflow fragment in that scope as long as it
73//! produces the collections expected by the rest of the framework. The rendering is handled by the
74//! `[crate::source::types::SourceRender::render] method.
75//!
76//! When rendering a source dataflow we expect three outputs. First, a health output, which is how
77//! the source communicates status updates about its health. Second, a data output, which is the
78//! main output of a source and contains the data that will eventually be recorded in the persist
79//! shard. Finally, an optional upper frontier output, which tracks the overall upstream upper
80//! frontier. When a source doesn't provide a dedicated progress output the framework derives one
81//! by observing the progress of the data output. This output (derived or not) is what drives
82//! reclocking. When a source provides a dedicated upper output, it can manage it independently of
83//! the data output frontier. For example, it's possible that a source implementation queries the
84//! upstream system to learn what are the latest offsets for and set the upper output based on
85//! that, even before having started the actual ingestion, which would be presented as data and
86//! progress trickling in via the data output.
87//!
88//! ```text
89//!                                                   resume upper
90//!                                              ,--------------------.
91//!                                             /                     |
92//!                            health     ,----+---.                  |
93//!                            output     | source |                  |
94//!                           ,-----------| reader |                  |
95//!                          /            +--,---.-+                  |
96//!                         /               /     \                   |
97//!                  +-----/----+   data   /       \  upper           |
98//!                  |  health  |   output/         \ output          |
99//!                  | operator |         |          \                |
100//!                  +----------+         |           |               |
101//!  FromTime                             |           |               |
102//!     scope                             |           |               |
103//!  -------------------------------------|-----------|---------------|---
104//!  IntoTime                             |           |               |
105//!     scope                             |      ,----+-----.         |
106//!                                       |     |  remap   |          |
107//!                                       |     | operator |          |
108//!                                       |     +---,------+          |
109//!                                       |        /                  |
110//!                                       |       / bindings          |
111//!                                       |      /                    |
112//!                                     ,-+-----+--.                  |
113//!                                     | reclock  |                  |
114//!                                     | operator |                  |
115//!                                     +-,--,---.-+                  |
116//!                           ,----------´.-´     \                   |
117//!                       _.-´         .-´         \                  |
118//!                   _.-´          .-´             \                 |
119//!                .-´            ,´                 \                |
120//!               /              /                    \               |
121//!        ,----------.   ,----------.           ,----------.         |
122//!        |  decode  |   |  decode  |   ....    |  decode  |         |
123//!        | output 0 |   | output 1 |           | output N |         |
124//!        +-----+----+   +-----+----+           +-----+----+         |
125//!              |              |                      |              |
126//!              |              |                      |              |
127//!        ,-----+----.   ,-----+----.           ,-----+----.         |
128//!        | envelope |   | envelope |   ....    | envelope |         |
129//!        | output 0 |   | output 1 |           | output N |         |
130//!        +----------+   +-----+----+           +-----+----+         |
131//!              |              |                      |              |
132//!              |              |                      |              |
133//!        ,-----+----.   ,-----+----.           ,-----+----.         |
134//!        |  persist |   |  persist |   ....    |  persist |         |
135//!        |  sink 0  |   |  sink 1  |           |  sink N  |         |
136//!        +-----+----+   +-----+----+           +-----+----+         |
137//!               \              \                    /               |
138//!                `-.            `,                 /                |
139//!                   `-._          `-.             /                 |
140//!                       `-._         `-.         /                  |
141//!                           `---------. `-.     /                   |
142//!                                     +`---`---+---,                |
143//!                                     |   resume   |                |
144//!                                     | calculator |                |
145//!                                     +------+-----+                |
146//!                                             \                     |
147//!                                              `-------------------´
148//! ```
149//!
150//! #### Reclocking
151//!
152//! Whenever a dataflow edge crosses the scope boundaries it must first be converted into a
153//! captured stream via the `[mz_timely_util::capture::UnboundedTokioCapture`] utility. This
154//! disassociates the stream and its progress information from the original timely scope and allows
155//! it to be read from a different place. The downside of this mechanism is that it's invisible to
156//! timely's progress tracking, but that seems like a necessary evil if we want to do reclocking.
157//!
158//! The two main ways these tokio-fied streams are turned back into normal timely streams in the
159//! destination scope are by the `reclock operator` and the `remap operator` which process the
160//! `data output` and `upper output` of the source reader respectively.
161//!
162//! The `remap operator` reads the `upper output`, which is composed only of frontiers, mints new
163//! bindings, and writes them into the remap shard. The final durable timestamp bindings are
164//! emitted as its output for consumption by the `reclock operator`.
165//!
166//! The `reclock operator` reads the `data output`, which contains both data and progress
167//! statements, and uses the bindings it receives from the `remap operator` to reclock each piece
168//! of data and each frontier statement into the target scope's timestamp and emit the reclocked
169//! stream in its output.
170//!
171//! #### Partitioning
172//!
173//! At this point we have a timely stream with correctly timestamped data in the mz time domain
174//! (`mz_repr::Timestamp`) which contains multiplexed messages for each of the potential subsources
175//! of this source. Each message selects the output it belongs to by setting the output field in
176//! [`crate::source::types::SourceMessage`]. By convention, the main source output is always output
177//! zero and subsources get the outputs from one onwards.
178//!
179//! However, regardless of whether the output is the main source or a subsource it is treated
180//! identically by the pipeline. Each output is demultiplexed into its own timely stream using
181//! [`timely::dataflow::operators::core::partition::Partition`] and the rest of the ingestion pipeline is
182//! rendered independently.
183//!
184//! #### Resumption frontier
185//!
186//! At the end of each per-output dataflow fragment is an instance of `persist_sink`, which is
187//! responsible for writing the final `Row` data into the corresponding output shard. The durable
188//! upper of each of the output shards is then recombined in a way that calculates the minimum
189//! upper frontier between them. This is what we refer to as the "resumption frontier" or "resume
190//! upper" and at this stage it is expressed in terms of `IntoTime` timestamps. As a final step,
191//! this resumption frontier is converted back into a `FromTime` timestamped frontier using
192//! `ReclockFollower::source_upper_at_frontier` and connected back to the source reader operator.
193//! This frontier is what drives the `OffsetCommiter` which informs the upstream system to release
194//! resources until the specified offsets.
195//!
196//! ## Exports
197//!
198//! Not yet documented
199
200use std::collections::BTreeMap;
201use std::rc::Rc;
202use std::sync::Arc;
203
204use mz_ore::error::ErrorExt;
205use mz_repr::{GlobalId, Row};
206use mz_storage_types::controller::CollectionMetadata;
207use mz_storage_types::dyncfgs;
208use mz_storage_types::oneshot_sources::{OneshotIngestionDescription, OneshotIngestionRequest};
209use mz_storage_types::sinks::StorageSinkDesc;
210use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceConnection};
211use mz_timely_util::antichain::AntichainExt;
212use mz_timely_util::scope_label::ScopeExt;
213use timely::dataflow::operators::vec::Map;
214use timely::dataflow::operators::{Concatenate, ConnectLoop, Feedback, Leave};
215use timely::progress::Antichain;
216use timely::worker::Worker as TimelyWorker;
217use tokio::sync::Semaphore;
218
219use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
220use crate::source::RawSourceCreationConfig;
221use crate::storage_state::StorageState;
222
223mod persist_sink;
224pub mod sinks;
225pub mod sources;
226
227/// Assemble the "ingestion" side of a dataflow, i.e. the sources.
228///
229/// This method creates a new dataflow to host the implementations of sources for the `dataflow`
230/// argument, and returns assets for each source that can import the results into a new dataflow.
231pub fn build_ingestion_dataflow(
232    timely_worker: &mut TimelyWorker,
233    storage_state: &mut StorageState,
234    primary_source_id: GlobalId,
235    description: IngestionDescription<CollectionMetadata>,
236    as_of: Antichain<mz_repr::Timestamp>,
237    resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
238    source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
239) {
240    let worker_id = timely_worker.index();
241    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
242    let debug_name = primary_source_id.to_string();
243    let name = format!("Source dataflow: {debug_name}");
244    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, root_scope| {
245        let root_scope = root_scope.with_label();
246
247        // Here we need to create two scopes. One timestamped with `()`, which is the root scope,
248        // and one timestamped with `mz_repr::Timestamp` which is the final scope of the dataflow.
249        // Refer to the module documentation for an explanation of this structure.
250        // The scope.clone() occurs to allow import in the region.
251        root_scope.clone().scoped(&name, |mz_scope| {
252            let debug_name = format!("{debug_name}-sources");
253
254            let mut tokens = vec![];
255
256            let (feedback_handle, feedback) = mz_scope.feedback(Default::default());
257
258            let connection = description.desc.connection.clone();
259            tracing::info!(
260                id = %primary_source_id,
261                as_of = %as_of.pretty(),
262                resume_uppers = ?resume_uppers,
263                source_resume_uppers = ?source_resume_uppers,
264                "timely-{worker_id} building {} source pipeline", connection.name(),
265            );
266
267            let busy_signal = if dyncfgs::SUSPENDABLE_SOURCES
268                .get(storage_state.storage_configuration.config_set())
269            {
270                Arc::new(Semaphore::new(1))
271            } else {
272                Arc::new(Semaphore::new(Semaphore::MAX_PERMITS))
273            };
274
275            let base_source_config = RawSourceCreationConfig {
276                name: format!("{}-{}", connection.name(), primary_source_id),
277                id: primary_source_id,
278                source_exports: description.source_exports.clone(),
279                timestamp_interval: description.desc.timestamp_interval,
280                worker_id: mz_scope.index(),
281                worker_count: mz_scope.peers(),
282                now_fn: storage_state.now.clone(),
283                metrics: storage_state.metrics.clone(),
284                as_of: as_of.clone(),
285                resume_uppers: resume_uppers.clone(),
286                source_resume_uppers,
287                remap_metadata: description.remap_metadata.clone(),
288                persist_clients: Arc::clone(&storage_state.persist_clients),
289                statistics: storage_state
290                    .aggregated_statistics
291                    .get_ingestion_stats(&primary_source_id),
292                shared_remap_upper: Rc::clone(
293                    &storage_state.source_uppers[&description.remap_collection_id],
294                ),
295                // This might quite a large clone, but its just during rendering
296                config: storage_state.storage_configuration.clone(),
297                remap_collection_id: description.remap_collection_id,
298                busy_signal: Arc::clone(&busy_signal),
299            };
300
301            let (outputs, source_health, source_tokens) = match connection {
302                GenericSourceConnection::Kafka(c) => crate::render::sources::render_source(
303                    mz_scope,
304                    root_scope,
305                    &debug_name,
306                    c,
307                    description.clone(),
308                    feedback,
309                    storage_state,
310                    base_source_config,
311                ),
312                GenericSourceConnection::Postgres(c) => crate::render::sources::render_source(
313                    mz_scope,
314                    root_scope,
315                    &debug_name,
316                    c,
317                    description.clone(),
318                    feedback,
319                    storage_state,
320                    base_source_config,
321                ),
322                GenericSourceConnection::MySql(c) => crate::render::sources::render_source(
323                    mz_scope,
324                    root_scope,
325                    &debug_name,
326                    c,
327                    description.clone(),
328                    feedback,
329                    storage_state,
330                    base_source_config,
331                ),
332                GenericSourceConnection::SqlServer(c) => crate::render::sources::render_source(
333                    mz_scope,
334                    root_scope,
335                    &debug_name,
336                    c,
337                    description.clone(),
338                    feedback,
339                    storage_state,
340                    base_source_config,
341                ),
342                GenericSourceConnection::LoadGenerator(c) => crate::render::sources::render_source(
343                    mz_scope,
344                    root_scope,
345                    &debug_name,
346                    c,
347                    description.clone(),
348                    feedback,
349                    storage_state,
350                    base_source_config,
351                ),
352            };
353            tokens.extend(source_tokens);
354
355            let mut upper_streams = vec![];
356            let mut health_streams = Vec::with_capacity(source_health.len() + outputs.len());
357            health_streams.extend(source_health);
358            for (export_id, (ok, err)) in outputs {
359                let export = &description.source_exports[&export_id];
360                let source_data = ok.map(Ok).concat(err.map(Err));
361
362                let metrics = storage_state.metrics.get_source_persist_sink_metrics(
363                    export_id,
364                    primary_source_id,
365                    worker_id,
366                    &export.storage_metadata.data_shard,
367                );
368
369                tracing::info!(
370                    id = %primary_source_id,
371                    "timely-{worker_id}: persisting export {} of {}",
372                    export_id,
373                    primary_source_id
374                );
375                let (upper_stream, errors, sink_tokens) = crate::render::persist_sink::render(
376                    mz_scope,
377                    export_id,
378                    export.storage_metadata.clone(),
379                    source_data,
380                    storage_state,
381                    metrics,
382                    Arc::clone(&busy_signal),
383                );
384                upper_streams.push(upper_stream);
385                tokens.extend(sink_tokens);
386
387                let sink_health = errors.map(move |err: Rc<anyhow::Error>| {
388                    let halt_status =
389                        HealthStatusUpdate::halting(err.display_with_causes().to_string(), None);
390                    HealthStatusMessage {
391                        id: None,
392                        namespace: StatusNamespace::Internal,
393                        update: halt_status,
394                    }
395                });
396                health_streams.push(sink_health.leave(root_scope));
397            }
398
399            mz_scope
400                .concatenate(upper_streams)
401                .connect_loop(feedback_handle);
402
403            let health_stream = root_scope.concatenate(health_streams);
404            let health_token = crate::healthcheck::health_operator(
405                root_scope,
406                storage_state.now.clone(),
407                resume_uppers
408                    .iter()
409                    .filter_map(|(id, frontier)| {
410                        // If the collection isn't closed, then we will remark it as Starting as
411                        // the dataflow comes up.
412                        (!frontier.is_empty()).then_some(*id)
413                    })
414                    .collect(),
415                primary_source_id,
416                "source",
417                health_stream,
418                crate::healthcheck::DefaultWriter {
419                    command_tx: storage_state.internal_cmd_tx.clone(),
420                    updates: Rc::clone(&storage_state.shared_status_updates),
421                },
422                storage_state
423                    .storage_configuration
424                    .parameters
425                    .record_namespaced_errors,
426                dyncfgs::STORAGE_SUSPEND_AND_RESTART_DELAY
427                    .get(storage_state.storage_configuration.config_set()),
428            );
429            tokens.push(health_token);
430
431            storage_state
432                .source_tokens
433                .insert(primary_source_id, tokens);
434        })
435    });
436}
437
438/// do the export dataflow thing
439pub fn build_export_dataflow(
440    timely_worker: &mut TimelyWorker,
441    storage_state: &mut StorageState,
442    id: GlobalId,
443    description: StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
444) {
445    let worker_logging = timely_worker.logger_for("timely").map(Into::into);
446    let debug_name = id.to_string();
447    let name = format!("Source dataflow: {debug_name}");
448    timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
449        let scope = scope.with_label();
450
451        let mut tokens = vec![];
452        let (health_stream, sink_tokens) =
453            crate::render::sinks::render_sink(scope, storage_state, id, &description);
454        tokens.extend(sink_tokens);
455
456        // Note that sinks also have only 1 active worker, which simplifies the work that
457        // `health_operator` has to do internally.
458        let health_token = crate::healthcheck::health_operator(
459            scope,
460            storage_state.now.clone(),
461            [id].into_iter().collect(),
462            id,
463            "sink",
464            health_stream,
465            crate::healthcheck::DefaultWriter {
466                command_tx: storage_state.internal_cmd_tx.clone(),
467                updates: Rc::clone(&storage_state.shared_status_updates),
468            },
469            storage_state
470                .storage_configuration
471                .parameters
472                .record_namespaced_errors,
473            dyncfgs::STORAGE_SUSPEND_AND_RESTART_DELAY
474                .get(storage_state.storage_configuration.config_set()),
475        );
476        tokens.push(health_token);
477
478        storage_state.sink_tokens.insert(id, tokens);
479    });
480}
481
482pub(crate) fn build_oneshot_ingestion_dataflow(
483    timely_worker: &mut TimelyWorker,
484    storage_state: &mut StorageState,
485    ingestion_id: uuid::Uuid,
486    collection_id: GlobalId,
487    collection_meta: CollectionMetadata,
488    description: OneshotIngestionRequest,
489) {
490    let (results_tx, results_rx) = tokio::sync::mpsc::unbounded_channel();
491    let callback = move |result| {
492        // TODO(cf3): Do we care if the receiver has gone away?
493        //
494        // Persist is working on cleaning up leaked blobs, we could also use `OneshotReceiverExt`
495        // here, but that might run into the infamous async-Drop problem.
496        let _ = results_tx.send(result);
497    };
498    let connection_context = storage_state
499        .storage_configuration
500        .connection_context
501        .clone();
502
503    let name = format!("Oneshot ingestion: {ingestion_id}");
504    let tokens = timely_worker.dataflow_named(&name, |scope| {
505        let scope = scope.with_label();
506        mz_storage_operators::oneshot_source::render(
507            scope,
508            Arc::clone(&storage_state.persist_clients),
509            connection_context,
510            collection_id,
511            collection_meta,
512            description,
513            callback,
514        )
515    });
516    let ingestion_description = OneshotIngestionDescription {
517        tokens,
518        results: results_rx,
519    };
520
521    storage_state
522        .oneshot_ingestions
523        .insert(ingestion_id, ingestion_description);
524}