1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! A client backed by multiple replicas.
//!
//! This client accepts commands and responds as would a correctly implemented client.
//! Its implementation is wrapped around clients that may fail at any point, and restart.
//! To accommodate this, it records the commands it accepts, and should a client restart
//! the commands are replayed at it, with some modification. As the clients respond, the
//! wrapper client tracks the responses and ensures that they are "logically deduplicated",
//! so that the receiver need not be aware of the replication and restarting.
//!
//! This tactic requires that dataflows be restartable, which they generally are not, due
//! to allowed compaction of their source data. This client must correctly observe commands
//! that allow for compaction of its assets, and only attempt to rebuild them as of those
//! compacted frontiers, as the underlying resources to rebuild them any earlier may not
//! exist any longer.

use std::collections::{HashMap, VecDeque};

use chrono::Utc;
use timely::progress::frontier::MutableAntichain;
use timely::progress::Antichain;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;

use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::GlobalId;

use super::{ActiveReplicationResponse, ReplicaId};
use super::{ComputeClient, GenericClient};
use super::{ComputeCommand, ComputeResponse};
use super::{Peek, PeekResponse};

/// Spawns a task that repeatedly sends messages back and forth
/// between a client and its owner, and return channels to communicate with it.
///
/// This can be useful because sending to an `mpsc` is synchronous, eliminating
/// cancelation-unsafety in some cases.
///
/// For this to be useful, `Client::recv` must itself be cancelation-safe.
pub fn spawn_client_task<
    C: Send + 'static,
    R: Send + 'static,
    Client: GenericClient<C, R> + 'static,
    Name: AsRef<str>,
    NameClosure: FnOnce() -> Name,
>(
    mut client: Client,
    nc: NameClosure,
) -> (
    UnboundedSender<C>,
    UnboundedReceiver<Result<R, anyhow::Error>>,
) {
    let (cmd_tx, mut cmd_rx) = unbounded_channel();
    let (response_tx, response_rx) = unbounded_channel();
    mz_ore::task::spawn(nc, async move {
        loop {
            tokio::select! {
                m = cmd_rx.recv() => {
                    match m {
                        Some(c) => {
                            // Issues should be detected, and
                            // reconnect attempted, on the `client.recv` path.
                            let _ = client.send(c).await;
                        },
                        None => break,
                    }
                },
                m = client.recv() => {
                    match m.transpose() {
                        Some(m) => {
                            if response_tx.send(m).is_err() {
                                break;
                            }
                        }
                        None => break,
                    }
                }
            }
        }
    });
    (cmd_tx, response_rx)
}

/// Additional information to store with pening peeks.
#[derive(Debug)]
pub struct PendingPeek {
    /// The OpenTelemetry context for this peek.
    otel_ctx: OpenTelemetryContext,
}

/// A client backed by multiple replicas.
#[derive(Debug)]
pub struct ActiveReplication<T> {
    /// Handles to the replicas themselves.
    replicas: HashMap<
        ReplicaId,
        (
            UnboundedSender<ComputeCommand<T>>,
            UnboundedReceiverStream<Result<ComputeResponse<T>, anyhow::Error>>,
        ),
    >,
    /// Outstanding peek identifiers, to guide responses (and which to suppress).
    peeks: HashMap<uuid::Uuid, PendingPeek>,
    /// Reported frontier of each in-progress tail.
    tails: HashMap<GlobalId, Antichain<T>>,
    /// Frontier information, both unioned across all replicas and from each individual replica.
    uppers: HashMap<GlobalId, (Antichain<T>, HashMap<ReplicaId, MutableAntichain<T>>)>,
    /// The command history, used when introducing new replicas or restarting existing replicas.
    history: crate::client::ComputeCommandHistory<T>,
    /// Most recent count of the volume of unpacked commands (e.g. dataflows in `CreateDataflows`).
    last_command_count: usize,
    /// Responses that should be emitted on the next `recv` call.
    ///
    /// This is introduced to produce peek cancelation responses eagerly, without awaiting a replica
    /// responding with the response itself, which allows us to compact away the peek in `self.history`.
    pending_response: VecDeque<ActiveReplicationResponse<T>>,
}

impl<T> Default for ActiveReplication<T> {
    fn default() -> Self {
        Self {
            replicas: Default::default(),
            peeks: Default::default(),
            tails: Default::default(),
            uppers: Default::default(),
            history: Default::default(),
            last_command_count: 0,
            pending_response: Default::default(),
        }
    }
}

impl<T> ActiveReplication<T>
where
    T: timely::progress::Timestamp,
{
    /// Introduce a new replica, and catch it up to the commands of other replicas.
    ///
    /// It is not yet clear under which circumstances a replica can be removed.
    pub fn add_replica<C: ComputeClient<T> + 'static>(&mut self, id: ReplicaId, client: C) {
        for (_, frontiers) in self.uppers.values_mut() {
            frontiers.insert(id, {
                let mut frontier = timely::progress::frontier::MutableAntichain::new();
                frontier.update_iter(Some((T::minimum(), 1)));
                frontier
            });
        }
        let (cmd_tx, resp_rx) =
            spawn_client_task(client, || "ActiveReplication client message pump");
        self.replicas.insert(id, (cmd_tx, resp_rx.into()));
        self.hydrate_replica(id);
    }

    pub fn get_replica_ids(&self) -> impl Iterator<Item = ReplicaId> + '_ {
        self.replicas.keys().copied()
    }

    /// Remove a replica by its identifier.
    pub fn remove_replica(&mut self, id: ReplicaId) {
        self.replicas.remove(&id);
        for (_frontier, frontiers) in self.uppers.iter_mut() {
            frontiers.1.remove(&id);
        }
    }

    /// Pipes a command stream at the indicated replica, introducing new dataflow identifiers.
    fn hydrate_replica(&mut self, replica_id: ReplicaId) {
        // Zero out frontiers maintained by this replica.
        for (_id, (_, frontiers)) in self.uppers.iter_mut() {
            *frontiers.get_mut(&replica_id).unwrap() =
                timely::progress::frontier::MutableAntichain::new();
            frontiers
                .get_mut(&replica_id)
                .unwrap()
                .update_iter(Some((T::minimum(), 1)));
        }
        // Take this opportunity to clean up the history we should present.
        self.last_command_count = self.history.reduce(&self.peeks);

        // Replay the commands at the client, creating new dataflow identifiers.
        let (cmd_tx, _) = self.replicas.get_mut(&replica_id).unwrap();
        for command in self.history.iter() {
            let mut command = command.clone();
            specialize_command(&mut command, replica_id);

            cmd_tx
                .send(command)
                .expect("Channel to client has gone away!")
        }
    }
}

#[async_trait::async_trait]
impl<T> GenericClient<ComputeCommand<T>, ActiveReplicationResponse<T>> for ActiveReplication<T>
where
    T: timely::progress::Timestamp + differential_dataflow::lattice::Lattice + std::fmt::Debug,
{
    /// The ADAPTER layer's isolation from COMPUTE depends on the fact that this
    /// function is essentially non-blocking, i.e. the ADAPTER blindly awaits
    /// calls to this function. This lets the ADAPTER continue operating even in
    /// the face of unhealthy or absent replicas.
    ///
    /// If this function every become blocking (e.g. making networking calls),
    /// the ADAPTER must amend its contract with COMPUTE.
    async fn send(&mut self, cmd: ComputeCommand<T>) -> Result<(), anyhow::Error> {
        // Update our tracking of peek commands.
        match &cmd {
            ComputeCommand::Peek(Peek { uuid, otel_ctx, .. }) => {
                self.peeks.insert(
                    *uuid,
                    PendingPeek {
                        // TODO(guswynn): can we just hold the `tracing::Span`
                        // here instead?
                        otel_ctx: otel_ctx.clone(),
                    },
                );
            }
            ComputeCommand::CancelPeeks { uuids } => {
                // Enqueue the response to the cancelation.
                self.pending_response.extend(uuids.iter().map(|uuid| {
                    // Canceled peeks should not be further responded to.
                    let otel_ctx = self
                        .peeks
                        .remove(uuid)
                        .map(|pending| pending.otel_ctx)
                        .unwrap_or_else(|| {
                            tracing::warn!("did not find pending peek for {}", uuid);
                            OpenTelemetryContext::empty()
                        });
                    ActiveReplicationResponse::ComputeResponse(ComputeResponse::PeekResponse(
                        *uuid,
                        PeekResponse::Canceled,
                        otel_ctx,
                    ))
                }));
            }
            _ => {}
        }

        // Initialize any necessary frontier tracking.
        let mut start = Vec::new();
        let mut cease = Vec::new();
        cmd.frontier_tracking(&mut start, &mut cease);
        for id in start.into_iter() {
            let frontier = timely::progress::Antichain::from_elem(T::minimum());
            let frontiers = self
                .replicas
                .keys()
                .map(|id| {
                    let mut frontier = timely::progress::frontier::MutableAntichain::new();
                    frontier.update_iter(Some((T::minimum(), 1)));
                    (id.clone(), frontier)
                })
                .collect();
            let previous = self.uppers.insert(id, (frontier, frontiers));
            assert!(previous.is_none());
        }
        for id in cease.into_iter() {
            let previous = self.uppers.remove(&id);
            assert!(previous.is_some());
        }

        // Record the command so that new replicas can be brought up to speed.
        self.history.push(cmd.clone());

        // If we have reached a point that justifies history reduction, do that.
        if self.history.len() > 2 * self.last_command_count {
            self.last_command_count = self.history.reduce(&self.peeks);
        }

        // Clone the command for each active replica.
        for (id, (tx, _)) in self.replicas.iter_mut() {
            let mut command = cmd.clone();
            specialize_command(&mut command, *id);

            // Errors are suppressed by this client, which awaits a reconnection
            // in `recv` and will rehydrate the client when that happens.
            //
            // NOTE: Broadcasting commands to replicas irrespective of their
            // presence or health is part of the isolation contract between
            // ADAPTER and COMPUTE. If this changes (e.g. awaiting responses
            // from replicas), ADAPTER needs to handle its interactions with
            // COMPUTE differently.
            let _ = tx.send(command);
        }

        Ok(())
    }

    async fn recv(&mut self) -> Result<Option<ActiveReplicationResponse<T>>, anyhow::Error> {
        // If we have a pending response, we should send it immediately.
        if let Some(response) = self.pending_response.pop_front() {
            return Ok(Some(response));
        }

        if self.replicas.is_empty() {
            // We want to communicate that the result is not ready
            futures::future::pending().await
        } else {
            // We may need to iterate, if a replica needs rehydration.
            let mut clean_recv = false;
            while !clean_recv {
                let mut errored_replica = None;

                // Receive responses from any of the replicas, and take appropriate action.
                let mut stream: tokio_stream::StreamMap<_, _> = self
                    .replicas
                    .iter_mut()
                    .map(|(id, (_, rx))| (id.clone(), rx))
                    .collect();

                use futures::StreamExt;
                while let Some((replica_id, message)) = stream.next().await {
                    self.pending_response
                        .push_front(ActiveReplicationResponse::ReplicaHeartbeat(
                            replica_id,
                            Utc::now(),
                        ));
                    match message {
                        Ok(ComputeResponse::PeekResponse(uuid, response, otel_ctx)) => {
                            // If this is the first response, forward it; otherwise do not.
                            // TODO: we could collect the other responses to assert equivalence?
                            // Trades resources (memory) for reassurances; idk which is best.
                            //
                            // NOTE: we use the `otel_ctx` from the response, not the
                            // pending peek, because we currently want the parent
                            // to be whatever the compute worker did with this peek.
                            //
                            // Additionally, we just use the `otel_ctx` from the first worker to
                            // respond.
                            if self.peeks.remove(&uuid).is_some() {
                                return Ok(Some(ActiveReplicationResponse::ComputeResponse(
                                    ComputeResponse::PeekResponse(uuid, response, otel_ctx),
                                )));
                            }
                        }
                        Ok(ComputeResponse::FrontierUppers(mut list)) => {
                            for (id, changes) in list.iter_mut() {
                                if let Some((frontier, frontiers)) = self.uppers.get_mut(id) {
                                    // Apply changes to replica `replica_id`
                                    frontiers
                                        .get_mut(&replica_id)
                                        .unwrap()
                                        .update_iter(changes.drain());
                                    // We can swap `frontier` into `changes, negated, and then use that to repopulate `frontier`.
                                    // Working
                                    changes.extend(frontier.iter().map(|t| (t.clone(), -1)));
                                    frontier.clear();
                                    for (time1, _neg_one) in changes.iter() {
                                        for time2 in frontiers[&replica_id].frontier().iter() {
                                            frontier.insert(time1.join(time2));
                                        }
                                    }
                                    changes.extend(frontier.iter().map(|t| (t.clone(), 1)));
                                    changes.compact();
                                }
                            }
                            if !list.is_empty() {
                                return Ok(Some(ActiveReplicationResponse::ComputeResponse(
                                    ComputeResponse::FrontierUppers(list),
                                )));
                            }
                        }
                        Ok(ComputeResponse::TailResponse(id, response)) => {
                            use crate::{TailBatch, TailResponse};
                            match response {
                                TailResponse::Batch(TailBatch {
                                    lower: _,
                                    upper,
                                    mut updates,
                                }) => {
                                    // It is sufficient to compare `upper` against the last reported frontier for `id`,
                                    // and if `upper` is not less or equal to that frontier, some progress has happened.
                                    // If so, we retain only the updates greater or equal to that last reported frontier,
                                    // and announce a batch from that frontier to its join with `upper`.

                                    // Ensure that we have a recorded frontier ready to go.
                                    let entry = self
                                        .tails
                                        .entry(id)
                                        .or_insert_with(|| Antichain::from_elem(T::minimum()));
                                    // If the upper frontier has changed, we have a statement to make.
                                    // This happens if there is any element of `entry` not greater or
                                    // equal to some element of `upper`.
                                    use differential_dataflow::lattice::Lattice;
                                    let new_upper = entry.join(&upper);
                                    if &new_upper != entry {
                                        let new_lower = entry.clone();
                                        entry.clone_from(&new_upper);
                                        updates.retain(|(time, _data, _diff)| {
                                            new_lower.less_equal(time)
                                        });
                                        return Ok(Some(
                                            ActiveReplicationResponse::ComputeResponse(
                                                ComputeResponse::TailResponse(
                                                    id,
                                                    TailResponse::Batch(TailBatch {
                                                        lower: new_lower,
                                                        upper: new_upper,
                                                        updates,
                                                    }),
                                                ),
                                            ),
                                        ));
                                    }
                                }
                                TailResponse::DroppedAt(frontier) => {
                                    // Introduce a new terminal frontier to suppress all future responses.
                                    // We cannot simply remove the entry, as we currently create new entries in response
                                    // to observed responses; if we pre-load the entries in response to commands we can
                                    // clean up the state here.
                                    self.tails.insert(id, Antichain::new());
                                    return Ok(Some(ActiveReplicationResponse::ComputeResponse(
                                        ComputeResponse::TailResponse(
                                            id,
                                            TailResponse::DroppedAt(frontier),
                                        ),
                                    )));
                                }
                            }
                        }
                        Err(_error) => {
                            errored_replica = Some(replica_id);
                            break;
                        }
                    }
                }
                drop(stream);

                if let Some(replica_id) = errored_replica {
                    tracing::warn!("Rehydrating replica {:?}", replica_id);
                    self.hydrate_replica(replica_id);
                }

                clean_recv = errored_replica.is_none();
            }
            // Indicate completion of the communication.
            Ok(None)
        }
    }
}

/// Specialize a command for the given `ReplicaId`.
///
/// Most `ComputeCommand`s are independent of the target replica, but some
/// contain replica-specific fields that must be adjusted before sending.
fn specialize_command<T>(command: &mut ComputeCommand<T>, replica_id: ReplicaId) {
    // Tell new instances their replica ID.
    if let ComputeCommand::CreateInstance(config) = command {
        config.replica_id = replica_id;
    }

    // Replace dataflow identifiers with new unique ids.
    if let ComputeCommand::CreateDataflows(dataflows) = command {
        for dataflow in dataflows.iter_mut() {
            dataflow.id = uuid::Uuid::new_v4();
        }
    }
}