mz_storage/storage_state/
async_storage_worker.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A friendly companion async worker that can be used by a timely storage
11//! worker to do work that requires async.
12//!
13//! CAUTION: This is not meant for high-throughput data processing but for
14//! one-off requests that we need to do every now and then.
15
16use std::collections::BTreeMap;
17use std::fmt::Display;
18use std::sync::Arc;
19use std::thread::Thread;
20
21use differential_dataflow::lattice::Lattice;
22use mz_persist_client::Diagnostics;
23use mz_persist_client::cache::PersistClientCache;
24use mz_persist_client::read::ListenEvent;
25use mz_persist_types::Codec64;
26use mz_persist_types::codec_impls::UnitSchema;
27use mz_repr::{GlobalId, Row, TimestampManipulation};
28use mz_storage_types::StorageDiff;
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::sinks::StorageSinkDesc;
31use mz_storage_types::sources::{
32    GenericSourceConnection, IngestionDescription, KafkaSourceConnection,
33    LoadGeneratorSourceConnection, MySqlSourceConnection, PostgresSourceConnection,
34    SourceConnection, SourceData, SourceEnvelope, SourceTimestamp, SqlServerSourceConnection,
35};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::frontier::MutableAntichain;
38use timely::progress::{Antichain, Timestamp};
39use tokio::sync::mpsc;
40
41use crate::source::types::SourceRender;
42
43/// A worker that can execute commands that come in on a channel and returns
44/// responses on another channel. This is useful in places where we can't
45/// normally run async code, such as the timely main loop.
46#[derive(Debug)]
47pub struct AsyncStorageWorker<T: Timestamp + Lattice + Codec64> {
48    tx: mpsc::UnboundedSender<AsyncStorageWorkerCommand<T>>,
49    rx: crossbeam_channel::Receiver<AsyncStorageWorkerResponse<T>>,
50}
51
52/// Commands for [AsyncStorageWorker].
53#[derive(Debug)]
54pub enum AsyncStorageWorkerCommand<T> {
55    /// Calculate a recent resumption frontier for the ingestion.
56    UpdateIngestionFrontiers(GlobalId, IngestionDescription<CollectionMetadata>),
57
58    /// Calculate a recent resumption frontier for the Sink.
59    UpdateSinkFrontiers(GlobalId, StorageSinkDesc<CollectionMetadata, T>),
60
61    /// This command is used to properly order create and drop of dataflows.
62    /// Currently, this is a no-op in AsyncStorageWorker.
63    ForwardDropDataflow(GlobalId),
64}
65
66/// Responses from [AsyncStorageWorker].
67#[derive(Debug)]
68pub enum AsyncStorageWorkerResponse<T: Timestamp + Lattice + Codec64> {
69    /// An `IngestionDescription` with recent as-of and resume upper frontiers.
70    IngestionFrontiersUpdated {
71        /// ID of the ingestion/source.
72        id: GlobalId,
73        /// The description of the ingestion/source.
74        ingestion_description: IngestionDescription<CollectionMetadata>,
75        /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
76        /// ingestion are guaranteed to be readable at this frontier.
77        as_of: Antichain<T>,
78        /// A frontier in the Materialize time domain with the property that all updates not beyond
79        /// it have already been durably ingested.
80        resume_uppers: BTreeMap<GlobalId, Antichain<T>>,
81        /// A frontier in the source time domain with the property that all updates not beyond it
82        /// have already been durably ingested.
83        source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
84    },
85    /// A `StorageSinkDesc` with recent as-of frontier.
86    ExportFrontiersUpdated {
87        /// ID of the sink.
88        id: GlobalId,
89        /// The updated description of the sink.
90        description: StorageSinkDesc<CollectionMetadata, T>,
91    },
92
93    /// Indicates data flow can be dropped.
94    DropDataflow(GlobalId),
95}
96
97async fn reclock_resume_uppers<C, IntoTime>(
98    id: &GlobalId,
99    persist_clients: &PersistClientCache,
100    ingestion_description: &IngestionDescription<CollectionMetadata>,
101    as_of: Antichain<IntoTime>,
102    resume_uppers: &BTreeMap<GlobalId, Antichain<IntoTime>>,
103) -> BTreeMap<GlobalId, Antichain<C::Time>>
104where
105    C: SourceConnection + SourceRender,
106    IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Display + Sync,
107{
108    let remap_metadata = &ingestion_description.remap_metadata;
109
110    let persist_client = persist_clients
111        .open(remap_metadata.persist_location.clone())
112        .await
113        .expect("location unavailable");
114
115    // We must load enough data in the timestamper to reclock all the requested frontiers
116    let mut remap_updates = vec![];
117    let mut remap_upper = as_of.clone();
118    let mut subscription = None;
119    for upper in resume_uppers.values() {
120        // TODO(petrosagg): this feels icky, we shouldn't have exceptions in frontier reasoning
121        // unless there is a good explanation as to why it is the case. It seems to me that this is
122        // because in various moments in ingestion we mix uppers and sinces and try to derive one
123        // from the other. Investigate if we could explicitly track natively timestamped
124        // since/uppers in the controller.
125        if upper.is_empty() {
126            continue;
127        }
128
129        while PartialOrder::less_than(&remap_upper, upper) {
130            let subscription = match subscription.as_mut() {
131                Some(subscription) => subscription,
132                None => {
133                    let read_handle = persist_client
134                        .open_leased_reader::<SourceData, (), IntoTime, StorageDiff>(
135                            remap_metadata.data_shard.clone(),
136                            Arc::new(remap_metadata.relation_desc.clone()),
137                            Arc::new(UnitSchema),
138                            Diagnostics {
139                                shard_name: ingestion_description.remap_collection_id.to_string(),
140                                handle_purpose: format!("reclock for {}", id),
141                            },
142                            false,
143                        )
144                        .await
145                        .expect("shard unavailable");
146
147                    let sub = read_handle
148                        .subscribe(as_of.clone())
149                        .await
150                        .expect("always valid to read at since");
151
152                    subscription.insert(sub)
153                }
154            };
155            for event in subscription.fetch_next().await {
156                match event {
157                    ListenEvent::Updates(updates) => {
158                        for ((k, v), t, d) in updates {
159                            let row: Row = k.expect("invalid binding").0.expect("invalid binding");
160                            let _v: () = v.expect("invalid binding");
161                            let from_ts = C::Time::decode_row(&row);
162                            remap_updates.push((from_ts, t, d));
163                        }
164                    }
165                    ListenEvent::Progress(f) => remap_upper = f,
166                }
167            }
168        }
169    }
170
171    remap_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
172
173    // The conversion of an IntoTime frontier to a FromTime frontier has the property that all
174    // messages that would be reclocked to times beyond the provided `IntoTime` frontier will be
175    // beyond the returned `FromTime` frontier. This can be used to compute a safe starting point
176    // to resume producing an `IntoTime` collection at a particular frontier.
177    let mut source_upper = MutableAntichain::new();
178    let mut source_upper_at_frontier = move |upper: &Antichain<IntoTime>| {
179        if PartialOrder::less_equal(upper, &as_of) {
180            Antichain::from_elem(Timestamp::minimum())
181        } else {
182            let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_equal(t));
183            source_upper.clear();
184            source_upper.update_iter(
185                remap_updates[0..idx]
186                    .iter()
187                    .map(|(from_time, _, diff)| (from_time.clone(), *diff)),
188            );
189            source_upper.frontier().to_owned()
190        }
191    };
192
193    let mut source_resume_uppers = BTreeMap::new();
194    for (id, upper) in resume_uppers {
195        let source_upper = source_upper_at_frontier(upper);
196        source_resume_uppers.insert(*id, source_upper);
197    }
198    source_resume_uppers
199}
200
201impl<T: Timestamp + TimestampManipulation + Lattice + Codec64 + Display + Sync>
202    AsyncStorageWorker<T>
203{
204    /// Creates a new [`AsyncStorageWorker`].
205    ///
206    /// IMPORTANT: The passed in `thread` is unparked when new responses
207    /// are added the response channel. It is important to not sleep the thread
208    /// that is reading from this via [`try_recv`](Self::try_recv) when
209    /// [`is_empty`](Self::is_empty) has returned `false`.
210    pub fn new(thread: Thread, persist_clients: Arc<PersistClientCache>) -> Self {
211        let (command_tx, mut command_rx) = mpsc::unbounded_channel();
212        let (response_tx, response_rx) = crossbeam_channel::unbounded();
213
214        let response_tx = ActivatingSender::new(response_tx, thread);
215
216        mz_ore::task::spawn(|| "AsyncStorageWorker", async move {
217            while let Some(command) = command_rx.recv().await {
218                match command {
219                    AsyncStorageWorkerCommand::UpdateIngestionFrontiers(
220                        id,
221                        ingestion_description,
222                    ) => {
223                        let mut resume_uppers = BTreeMap::new();
224
225                        for (id, export) in ingestion_description.source_exports.iter() {
226                            // Explicit destructuring to force a compile error when the metadata change
227                            let CollectionMetadata {
228                                persist_location,
229                                data_shard,
230                                relation_desc,
231                                txns_shard,
232                            } = &export.storage_metadata;
233                            assert_eq!(
234                                txns_shard, &None,
235                                "source {} unexpectedly using txn-wal",
236                                id
237                            );
238                            let client = persist_clients
239                                .open(persist_location.clone())
240                                .await
241                                .expect("error creating persist client");
242
243                            let mut write_handle = client
244                                .open_writer::<SourceData, (), T, StorageDiff>(
245                                    *data_shard,
246                                    Arc::new(relation_desc.clone()),
247                                    Arc::new(UnitSchema),
248                                    Diagnostics {
249                                        shard_name: id.to_string(),
250                                        handle_purpose: format!("resumption data {}", id),
251                                    },
252                                )
253                                .await
254                                .unwrap();
255                            let upper = write_handle.fetch_recent_upper().await;
256                            let upper = match export.data_config.envelope {
257                                // The CdcV2 envelope must re-ingest everything since the Mz frontier does not have a relation to upstream timestamps.
258                                // TODO(petrosagg): move this reasoning to the controller
259                                SourceEnvelope::CdcV2 if upper.is_empty() => Antichain::new(),
260                                SourceEnvelope::CdcV2 => Antichain::from_elem(Timestamp::minimum()),
261                                _ => upper.clone(),
262                            };
263                            resume_uppers.insert(*id, upper);
264                            write_handle.expire().await;
265                        }
266
267                        // Here we update the as-of frontier of the ingestion.
268                        //
269                        // The as-of frontier controls the frontier with which all inputs of the
270                        // ingestion dataflow will be advanced by. It is in our interest to set the
271                        // as-of froniter to the largest possible value, which will result in the
272                        // maximum amount of consolidation, which in turn results in the minimum
273                        // amount of memory required to hydrate.
274                        //
275                        // For each output `o` and for each input `i` of the ingestion the
276                        // controller guarantees that i.since < o.upper except when o.upper is
277                        // [T::minimum()]. Therefore the largest as-of for a particular output `o`
278                        // is `{ (t - 1).advance_by(i.since) | t in o.upper }`.
279                        //
280                        // To calculate the global as_of frontier we take the minimum of all those
281                        // per-output as-of frontiers.
282                        let client = persist_clients
283                            .open(
284                                ingestion_description
285                                    .remap_metadata
286                                    .persist_location
287                                    .clone(),
288                            )
289                            .await
290                            .expect("error creating persist client");
291                        let read_handle = client
292                            .open_leased_reader::<SourceData, (), T, StorageDiff>(
293                                ingestion_description.remap_metadata.data_shard,
294                                Arc::new(
295                                    ingestion_description.remap_metadata.relation_desc.clone(),
296                                ),
297                                Arc::new(UnitSchema),
298                                Diagnostics {
299                                    shard_name: ingestion_description
300                                        .remap_collection_id
301                                        .to_string(),
302                                    handle_purpose: format!("resumption data for {}", id),
303                                },
304                                false,
305                            )
306                            .await
307                            .unwrap();
308                        let remap_since = read_handle.since().clone();
309                        mz_ore::task::spawn(move || "deferred_expire", async move {
310                            tokio::time::sleep(std::time::Duration::from_secs(300)).await;
311                            read_handle.expire().await;
312                        });
313                        let mut as_of = Antichain::new();
314                        for upper in resume_uppers.values() {
315                            for t in upper.elements() {
316                                let mut t_prime = t.step_back().unwrap_or_else(T::minimum);
317                                if !remap_since.is_empty() {
318                                    t_prime.advance_by(remap_since.borrow());
319                                    as_of.insert(t_prime);
320                                }
321                            }
322                        }
323
324                        /// Convenience function to convert `BTreeMap<GlobalId, Antichain<C>>` to
325                        /// `BTreeMap<GlobalId, Vec<Row>>`.
326                        fn to_vec_row<T: SourceTimestamp>(
327                            uppers: BTreeMap<GlobalId, Antichain<T>>,
328                        ) -> BTreeMap<GlobalId, Vec<Row>> {
329                            uppers
330                                .into_iter()
331                                .map(|(id, upper)| {
332                                    (id, upper.into_iter().map(|ts| ts.encode_row()).collect())
333                                })
334                                .collect()
335                        }
336
337                        // Create a specialized description to be able to call the generic method
338                        let source_resume_uppers = match ingestion_description.desc.connection {
339                            GenericSourceConnection::Kafka(_) => {
340                                let uppers = reclock_resume_uppers::<KafkaSourceConnection, _>(
341                                    &id,
342                                    &persist_clients,
343                                    &ingestion_description,
344                                    as_of.clone(),
345                                    &resume_uppers,
346                                )
347                                .await;
348                                to_vec_row(uppers)
349                            }
350                            GenericSourceConnection::Postgres(_) => {
351                                let uppers = reclock_resume_uppers::<PostgresSourceConnection, _>(
352                                    &id,
353                                    &persist_clients,
354                                    &ingestion_description,
355                                    as_of.clone(),
356                                    &resume_uppers,
357                                )
358                                .await;
359                                to_vec_row(uppers)
360                            }
361                            GenericSourceConnection::MySql(_) => {
362                                let uppers = reclock_resume_uppers::<MySqlSourceConnection, _>(
363                                    &id,
364                                    &persist_clients,
365                                    &ingestion_description,
366                                    as_of.clone(),
367                                    &resume_uppers,
368                                )
369                                .await;
370                                to_vec_row(uppers)
371                            }
372                            GenericSourceConnection::SqlServer(_) => {
373                                let uppers = reclock_resume_uppers::<SqlServerSourceConnection, _>(
374                                    &id,
375                                    &persist_clients,
376                                    &ingestion_description,
377                                    as_of.clone(),
378                                    &resume_uppers,
379                                )
380                                .await;
381                                to_vec_row(uppers)
382                            }
383                            GenericSourceConnection::LoadGenerator(_) => {
384                                let uppers =
385                                    reclock_resume_uppers::<LoadGeneratorSourceConnection, _>(
386                                        &id,
387                                        &persist_clients,
388                                        &ingestion_description,
389                                        as_of.clone(),
390                                        &resume_uppers,
391                                    )
392                                    .await;
393                                to_vec_row(uppers)
394                            }
395                        };
396
397                        let res = response_tx.send(
398                            AsyncStorageWorkerResponse::IngestionFrontiersUpdated {
399                                id,
400                                ingestion_description,
401                                as_of,
402                                resume_uppers,
403                                source_resume_uppers,
404                            },
405                        );
406
407                        if let Err(_err) = res {
408                            // Receiver must have hung up.
409                            break;
410                        }
411                    }
412                    AsyncStorageWorkerCommand::UpdateSinkFrontiers(id, mut description) => {
413                        let metadata = description.to_storage_metadata.clone();
414                        let client = persist_clients
415                            .open(metadata.persist_location.clone())
416                            .await
417                            .expect("error creating persist client");
418
419                        let mut write_handle = client
420                            .open_writer::<SourceData, (), T, StorageDiff>(
421                                metadata.data_shard,
422                                Arc::new(metadata.relation_desc),
423                                Arc::new(UnitSchema),
424                                Diagnostics {
425                                    shard_name: id.to_string(),
426                                    handle_purpose: format!("resumption data {}", id),
427                                },
428                            )
429                            .await
430                            .unwrap();
431                        // Choose an as-of frontier for this execution of the sink. If the write
432                        // frontier of the sink is strictly larger than its read hold, it must have
433                        // at least written out its snapshot, and we can skip reading it; otherwise
434                        // assume we may have to replay from the beginning.
435                        let upper = write_handle.fetch_recent_upper().await;
436                        let mut read_hold = Antichain::from_iter(
437                            upper
438                                .iter()
439                                .map(|t| t.step_back().unwrap_or_else(T::minimum)),
440                        );
441                        read_hold.join_assign(&description.as_of);
442                        description.with_snapshot = description.with_snapshot
443                            && !PartialOrder::less_than(&description.as_of, upper);
444                        description.as_of = read_hold;
445                        let res =
446                            response_tx.send(AsyncStorageWorkerResponse::ExportFrontiersUpdated {
447                                id,
448                                description,
449                            });
450
451                        if let Err(_err) = res {
452                            // Receiver must have hung up.
453                            break;
454                        }
455                    }
456                    AsyncStorageWorkerCommand::ForwardDropDataflow(id) => {
457                        if let Err(_) =
458                            response_tx.send(AsyncStorageWorkerResponse::DropDataflow(id))
459                        {
460                            // Receiver hang up
461                            break;
462                        }
463                    }
464                }
465            }
466            tracing::trace!("shutting down async storage worker task");
467        });
468
469        Self {
470            tx: command_tx,
471            rx: response_rx,
472        }
473    }
474
475    /// Updates the frontiers associated with the provided `IngestionDescription` to recent values.
476    /// Currently this will calculate a fresh as-of for the ingestion and a fresh resumption
477    /// frontier for each of the exports.
478    pub fn update_ingestion_frontiers(
479        &self,
480        id: GlobalId,
481        ingestion: IngestionDescription<CollectionMetadata>,
482    ) {
483        self.send(AsyncStorageWorkerCommand::UpdateIngestionFrontiers(
484            id, ingestion,
485        ))
486    }
487
488    /// Updates the frontiers associated with the provided `StorageSinkDesc` to recent values.
489    /// Currently this will calculate a fresh as-of for the ingestion.
490    pub fn update_sink_frontiers(
491        &self,
492        id: GlobalId,
493        sink: StorageSinkDesc<CollectionMetadata, T>,
494    ) {
495        self.send(AsyncStorageWorkerCommand::UpdateSinkFrontiers(id, sink))
496    }
497
498    /// Enqueue a drop dataflow in the async storage worker channel to ensure proper
499    /// ordering of creating and dropping data flows.
500    pub fn drop_dataflow(&self, id: GlobalId) {
501        self.send(AsyncStorageWorkerCommand::ForwardDropDataflow(id))
502    }
503
504    fn send(&self, cmd: AsyncStorageWorkerCommand<T>) {
505        self.tx
506            .send(cmd)
507            .expect("persist worker exited while its handle was alive")
508    }
509
510    /// Attempts to receive a message from the worker without blocking.
511    ///
512    /// This internally does a `try_recv` on a channel.
513    pub fn try_recv(
514        &self,
515    ) -> Result<AsyncStorageWorkerResponse<T>, crossbeam_channel::TryRecvError> {
516        self.rx.try_recv()
517    }
518
519    /// Returns `true` if there are currently no responses.
520    pub fn is_empty(&self) -> bool {
521        self.rx.is_empty()
522    }
523}
524
525/// Helper that makes sure that we always unpark the target thread when we send a
526/// message.
527struct ActivatingSender<T> {
528    tx: crossbeam_channel::Sender<T>,
529    thread: Thread,
530}
531
532impl<T> ActivatingSender<T> {
533    fn new(tx: crossbeam_channel::Sender<T>, thread: Thread) -> Self {
534        Self { tx, thread }
535    }
536
537    fn send(&self, message: T) -> Result<(), crossbeam_channel::SendError<T>> {
538        let res = self.tx.send(message);
539        self.thread.unpark();
540        res
541    }
542}