1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Renders a plan into a timely/differential dataflow computation.
//!
//! ## Error handling
//!
//! Timely and differential have no idioms for computations that can error. The
//! philosophy is, reasonably, to define the semantics of the computation such
//! that errors are unnecessary: e.g., by using wrap-around semantics for
//! integer overflow.
//!
//! Unfortunately, SQL semantics are not nearly so elegant, and require errors
//! in myriad cases. The classic example is a division by zero, but invalid
//! input for casts, overflowing integer operations, and dozens of other
//! functions need the ability to produce errors ar runtime.
//!
//! At the moment, only *scalar* expression evaluation can fail, so only
//! operators that evaluate scalar expressions can fail. At the time of writing,
//! that includes map, filter, reduce, and join operators. Constants are a bit
//! of a special case: they can be either a constant vector of rows *or* a
//! constant, singular error.
//!
//! The approach taken is to build two parallel trees of computation: one for
//! the rows that have been successfully evaluated (the "oks tree"), and one for
//! the errors that have been generated (the "errs tree"). For example:
//!
//! ```text
//! oks1 errs1 oks2 errs2
//! | | | |
//! | | | |
//! project | | |
//! | | | |
//! | | | |
//! map | | |
//! |\ | | |
//! | \ | | |
//! | \ | | |
//! | \ | | |
//! | \| | |
//! project + + +
//! | | / /
//! | | / /
//! join ------------+ /
//! | | /
//! | | +----------+
//! | |/
//! oks errs
//! ```
//!
//! The project operation cannot fail, so errors from errs1 are propagated
//! directly. Map operators are fallible and so can inject additional errors
//! into the stream. Join operators combine the errors from each of their
//! inputs.
//!
//! The semantics of the error stream are minimal. From the perspective of SQL,
//! a dataflow is considered to be in an error state if there is at least one
//! element in the final errs collection. The error value returned to the user
//! is selected arbitrarily; SQL only makes provisions to return one error to
//! the user at a time. There are plans to make the err collection accessible to
//! end users, so they can see all errors at once.
//!
//! To make errors transient, simply ensure that the operator can retract any
//! produced errors when corrected data arrives. To make errors permanent, write
//! the operator such that it never retracts the errors it produced. Future work
//! will likely want to introduce some sort of sort order for errors, so that
//! permanent errors are returned to the user ahead of transient errors—probably
//! by introducing a new error type a la:
//!
//! ```no_run
//! # struct EvalError;
//! # struct SourceError;
//! enum DataflowError {
//! Transient(EvalError),
//! Permanent(SourceError),
//! }
//! ```
//!
//! If the error stream is empty, the oks stream must be correct. If the error
//! stream is non-empty, then there are no semantics for the oks stream. This is
//! sufficient to support SQL in its current form, but is likely to be
//! unsatisfactory long term. We suspect that we can continue to imbue the oks
//! stream with semantics if we are very careful in describing what data should
//! and should not be produced upon encountering an error. Roughly speaking, the
//! oks stream could represent the correct result of the computation where all
//! rows that caused an error have been pruned from the stream. There are
//! strange and confusing questions here around foreign keys, though: what if
//! the optimizer proves that a particular key must exist in a collection, but
//! the key gets pruned away because its row participated in a scalar expression
//! evaluation that errored?
//!
//! In the meantime, it is probably wise for operators to keep the oks stream
//! roughly "as correct as possible" even when errors are present in the errs
//! stream. This reduces the amount of recomputation that must be performed
//! if/when the errors are retracted.
use std::collections::{BTreeMap, BTreeSet};
use std::rc::Rc;
use timely::communication::Allocate;
use timely::dataflow::Scope;
use timely::progress::Antichain;
use timely::worker::Worker as TimelyWorker;
use mz_repr::GlobalId;
use crate::controller::CollectionMetadata;
use crate::storage_state::StorageState;
use crate::types::sinks::StorageSinkDesc;
use crate::types::sources::IngestionDescription;
mod debezium;
mod persist_sink;
pub mod sinks;
pub mod sources;
mod upsert;
/// Assemble the "ingestion" side of a dataflow, i.e. the sources.
///
/// This method creates a new dataflow to host the implementations of sources for the `dataflow`
/// argument, and returns assets for each source that can import the results into a new dataflow.
pub fn build_ingestion_dataflow<A: Allocate>(
timely_worker: &mut TimelyWorker<A>,
storage_state: &mut StorageState,
id: GlobalId,
description: IngestionDescription<CollectionMetadata>,
resume_upper: Antichain<mz_repr::Timestamp>,
) {
let worker_logging = timely_worker.log_register().get("timely");
let debug_name = id.to_string();
let name = format!("Source dataflow: {debug_name}");
timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
// The scope.clone() occurs to allow import in the region.
// We build a region here to establish a pattern of a scope inside the dataflow,
// so that other similar uses (e.g. with iterative scopes) do not require weird
// alternate type signatures.
scope.clone().region_named(&name, |region| {
let debug_name = format!("{debug_name}-sources");
let ((ok, err), token) = crate::render::sources::render_source(
region,
&debug_name,
id,
description.clone(),
resume_upper,
// NOTE: For now sources never have LinearOperators but might have in the future
None,
storage_state,
);
let source_data = ok.map(Ok).concat(&err.map(Err));
crate::render::persist_sink::render(
region,
id,
description.storage_metadata,
source_data,
storage_state,
Rc::clone(&token),
);
storage_state.source_tokens.insert(id, token);
})
});
}
/// do the export dataflow thing
pub fn build_export_dataflow<A: Allocate>(
timely_worker: &mut TimelyWorker<A>,
storage_state: &mut StorageState,
id: GlobalId,
description: StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
) {
let worker_logging = timely_worker.log_register().get("timely");
let debug_name = id.to_string();
let name = format!("Source dataflow: {debug_name}");
timely_worker.dataflow_core(&name, worker_logging, Box::new(()), |_, scope| {
// The scope.clone() occurs to allow import in the region.
// We build a region here to establish a pattern of a scope inside the dataflow,
// so that other similar uses (e.g. with iterative scopes) do not require weird
// alternate type signatures.
scope.clone().region_named(&name, |region| {
let _debug_name = format!("{debug_name}-sinks");
let _: &mut timely::dataflow::scopes::Child<
timely::dataflow::scopes::Child<TimelyWorker<A>, _>,
mz_repr::Timestamp,
> = region;
let mut tokens = BTreeMap::new();
let import_ids = BTreeSet::new();
crate::render::sinks::render_sink(
region,
storage_state,
&mut tokens,
import_ids,
id,
&description,
);
})
});
}