Skip to main content

mz_compute/
command_channel.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A channel for broadcasting compute commands from worker 0 to other workers.
11//!
12//! Compute uses a dataflow to distribute commands between workers. This is to ensure workers
13//! retain a consistent dataflow state across reconnects. If each worker would receive its commands
14//! directly from the controller, there wouldn't be any guarantee that after a reconnect all
15//! workers have seen the same sequence of commands. This is particularly problematic for
16//! `CreateDataflow` commands, since Timely requires that all workers render the same dataflows in
17//! the same order. So the controller instead sends commands only to worker 0, which then
18//! broadcasts them to other workers through the Timely fabric, taking care of the correct
19//! sequencing.
20//!
21//! Commands in the command channel are tagged with a nonce identifying the incarnation of the
22//! compute protocol the command belongs to, allowing workers to recognize client reconnects that
23//! require a reconciliation.
24
25use std::sync::mpsc::{self, TryRecvError};
26use std::sync::{Arc, Mutex};
27
28use itertools::Itertools;
29use mz_compute_client::protocol::command::ComputeCommand;
30use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
31use mz_ore::cast::CastFrom;
32use mz_timely_util::scope_label::ScopeExt;
33use timely::communication::Allocate;
34use timely::dataflow::channels::pact::Exchange;
35use timely::dataflow::operators::Operator;
36use timely::dataflow::operators::generic::source;
37use timely::scheduling::{Scheduler, SyncActivator};
38use timely::worker::{AsWorker, Worker as TimelyWorker};
39use uuid::Uuid;
40
41/// A sender pushing commands onto the command channel.
42pub struct Sender {
43    tx: mpsc::Sender<(ComputeCommand, Uuid)>,
44    activator: Arc<Mutex<Option<SyncActivator>>>,
45}
46
47impl Sender {
48    /// Broadcasts the given command to all workers.
49    pub fn send(&self, message: (ComputeCommand, Uuid)) {
50        if self.tx.send(message).is_err() {
51            unreachable!("command channel never shuts down");
52        }
53
54        self.activator
55            .lock()
56            .expect("poisoned")
57            .as_ref()
58            .map(|a| a.activate());
59    }
60}
61
62/// A receiver reading commands from the command channel.
63pub struct Receiver {
64    rx: mpsc::Receiver<(ComputeCommand, Uuid)>,
65}
66
67impl Receiver {
68    /// Returns the next available command, if any.
69    ///
70    /// This returns `None` when there are currently no commands but there might be commands again
71    /// in the future.
72    pub fn try_recv(&self) -> Option<(ComputeCommand, Uuid)> {
73        match self.rx.try_recv() {
74            Ok(msg) => Some(msg),
75            Err(TryRecvError::Empty) => None,
76            Err(TryRecvError::Disconnected) => {
77                unreachable!("command channel never shuts down");
78            }
79        }
80    }
81}
82
83/// Render the command channel dataflow.
84pub fn render<A: Allocate>(timely_worker: &mut TimelyWorker<A>) -> (Sender, Receiver) {
85    let (input_tx, input_rx) = mpsc::channel();
86    let (output_tx, output_rx) = mpsc::channel();
87    let activator = Arc::new(Mutex::new(None));
88
89    // TODO(teskje): This implementation relies on Timely channels preserving the order of their
90    // inputs, which is not something they guarantee. We can avoid that by using explicit indexing,
91    // like storage's command sequencer does.
92    timely_worker.dataflow_named::<u64, _, _>("command_channel", {
93        let activator = Arc::clone(&activator);
94        move |scope| {
95            let scope = &mut scope.with_label();
96
97            source(scope, "command_channel::source", |cap, info| {
98                let sync_activator = scope.sync_activator_for(info.address.to_vec());
99                *activator.lock().expect("poisoned") = Some(sync_activator);
100
101                let worker_id = scope.index();
102                let peers = scope.peers();
103
104                // Only worker 0 broadcasts commands, other workers must drop their capability to
105                // avoid holding up dataflow progress.
106                let mut capability = (worker_id == 0).then_some(cap);
107
108                move |output| {
109                    let Some(cap) = &mut capability else {
110                        // Non-leader workers will still receive `UpdateConfiguration` commands and
111                        // we must drain those to not leak memory.
112                        while let Ok((cmd, _nonce)) = input_rx.try_recv() {
113                            assert_ne!(worker_id, 0);
114                            assert!(matches!(cmd, ComputeCommand::UpdateConfiguration(_)));
115                        }
116                        return;
117                    };
118
119                    assert_eq!(worker_id, 0);
120
121                    let input: Vec<_> = input_rx.try_iter().collect();
122                    for (cmd, nonce) in input {
123                        let worker_cmds =
124                            split_command(cmd, peers).map(|(idx, cmd)| (idx, cmd, nonce));
125                        output.session(&cap).give_iterator(worker_cmds);
126
127                        cap.downgrade(&(cap.time() + 1));
128                    }
129                }
130            })
131            .sink(
132                Exchange::new(|(idx, _, _)| u64::cast_from(*idx)),
133                "command_channel::sink",
134                move |(input, _)| {
135                    input.for_each(|_time, data| {
136                        for (_idx, cmd, nonce) in data.drain(..) {
137                            let _ = output_tx.send((cmd, nonce));
138                        }
139                    });
140                },
141            );
142        }
143    });
144
145    let tx = Sender {
146        tx: input_tx,
147        activator,
148    };
149    let rx = Receiver { rx: output_rx };
150
151    (tx, rx)
152}
153
154/// Split the given command into the given number of parts.
155///
156/// Returns an iterator that produces each command part, along with its part index.
157fn split_command(
158    command: ComputeCommand,
159    parts: usize,
160) -> impl Iterator<Item = (usize, ComputeCommand)> {
161    use itertools::Either;
162
163    let commands = match command {
164        ComputeCommand::CreateDataflow(dataflow) => {
165            let dataflow = *dataflow;
166
167            // A list of descriptions of objects for each part to build.
168            let mut builds_parts = vec![Vec::new(); parts];
169            // Partition each build description among `parts`.
170            for build_desc in dataflow.objects_to_build {
171                let build_part = build_desc.plan.partition_among(parts);
172                for (plan, objects_to_build) in
173                    build_part.into_iter().zip_eq(builds_parts.iter_mut())
174                {
175                    objects_to_build.push(BuildDesc {
176                        id: build_desc.id,
177                        plan,
178                    });
179                }
180            }
181
182            // Each list of build descriptions results in a dataflow description.
183            let commands = builds_parts
184                .into_iter()
185                .map(move |objects_to_build| DataflowDescription {
186                    source_imports: dataflow.source_imports.clone(),
187                    index_imports: dataflow.index_imports.clone(),
188                    objects_to_build,
189                    index_exports: dataflow.index_exports.clone(),
190                    sink_exports: dataflow.sink_exports.clone(),
191                    as_of: dataflow.as_of.clone(),
192                    until: dataflow.until.clone(),
193                    debug_name: dataflow.debug_name.clone(),
194                    initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
195                    refresh_schedule: dataflow.refresh_schedule.clone(),
196                    time_dependence: dataflow.time_dependence.clone(),
197                })
198                .map(Box::new)
199                .map(ComputeCommand::CreateDataflow);
200            Either::Left(commands)
201        }
202        command => {
203            let commands = std::iter::repeat_n(command, parts);
204            Either::Right(commands)
205        }
206    };
207
208    commands.into_iter().enumerate()
209}