mz_storage/storage_state/
async_storage_worker.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A friendly companion async worker that can be used by a timely storage
11//! worker to do work that requires async.
12//!
13//! CAUTION: This is not meant for high-throughput data processing but for
14//! one-off requests that we need to do every now and then.
15
16use std::collections::BTreeMap;
17use std::fmt::Display;
18use std::sync::Arc;
19use std::thread::Thread;
20
21use differential_dataflow::lattice::Lattice;
22use mz_persist_client::Diagnostics;
23use mz_persist_client::cache::PersistClientCache;
24use mz_persist_client::read::ListenEvent;
25use mz_persist_types::Codec64;
26use mz_persist_types::codec_impls::UnitSchema;
27use mz_repr::{GlobalId, Row, TimestampManipulation};
28use mz_storage_types::StorageDiff;
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::sinks::StorageSinkDesc;
31use mz_storage_types::sources::{
32    GenericSourceConnection, IngestionDescription, KafkaSourceConnection,
33    LoadGeneratorSourceConnection, MySqlSourceConnection, PostgresSourceConnection,
34    SourceConnection, SourceData, SourceEnvelope, SourceTimestamp, SqlServerSource,
35};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::frontier::MutableAntichain;
38use timely::progress::{Antichain, Timestamp};
39use tokio::sync::mpsc;
40
41use crate::source::types::SourceRender;
42
43/// A worker that can execute commands that come in on a channel and returns
44/// responses on another channel. This is useful in places where we can't
45/// normally run async code, such as the timely main loop.
46#[derive(Debug)]
47pub struct AsyncStorageWorker<T: Timestamp + Lattice + Codec64> {
48    tx: mpsc::UnboundedSender<AsyncStorageWorkerCommand<T>>,
49    rx: crossbeam_channel::Receiver<AsyncStorageWorkerResponse<T>>,
50}
51
52/// Commands for [AsyncStorageWorker].
53#[derive(Debug)]
54pub enum AsyncStorageWorkerCommand<T> {
55    /// Calculate a recent resumption frontier for the ingestion.
56    UpdateIngestionFrontiers(GlobalId, IngestionDescription<CollectionMetadata>),
57
58    /// Calculate a recent resumption frontier for the Sink.
59    UpdateSinkFrontiers(GlobalId, StorageSinkDesc<CollectionMetadata, T>),
60
61    /// This command is used to properly order create and drop of dataflows.
62    /// Currently, this is a no-op in AsyncStorageWorker.
63    ForwardDropDataflow(GlobalId),
64}
65
66/// Responses from [AsyncStorageWorker].
67#[derive(Debug)]
68pub enum AsyncStorageWorkerResponse<T: Timestamp + Lattice + Codec64> {
69    /// An `IngestionDescription` with recent as-of and resume upper frontiers.
70    IngestionFrontiersUpdated {
71        /// ID of the ingestion/source.
72        id: GlobalId,
73        /// The description of the ingestion/source.
74        ingestion_description: IngestionDescription<CollectionMetadata>,
75        /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
76        /// ingestion are guaranteed to be readable at this frontier.
77        as_of: Antichain<T>,
78        /// A frontier in the Materialize time domain with the property that all updates not beyond
79        /// it have already been durably ingested.
80        resume_uppers: BTreeMap<GlobalId, Antichain<T>>,
81        /// A frontier in the source time domain with the property that all updates not beyond it
82        /// have already been durably ingested.
83        source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
84    },
85    /// A `StorageSinkDesc` with recent as-of frontier.
86    ExportFrontiersUpdated {
87        /// ID of the sink.
88        id: GlobalId,
89        /// The updated description of the sink.
90        description: StorageSinkDesc<CollectionMetadata, T>,
91    },
92
93    /// Indicates data flow can be dropped.
94    DropDataflow(GlobalId),
95}
96
97async fn reclock_resume_uppers<C, IntoTime>(
98    id: &GlobalId,
99    persist_clients: &PersistClientCache,
100    ingestion_description: &IngestionDescription<CollectionMetadata>,
101    as_of: Antichain<IntoTime>,
102    resume_uppers: &BTreeMap<GlobalId, Antichain<IntoTime>>,
103) -> BTreeMap<GlobalId, Antichain<C::Time>>
104where
105    C: SourceConnection + SourceRender,
106    IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Display + Sync,
107{
108    let metadata = &ingestion_description.ingestion_metadata;
109
110    let persist_client = persist_clients
111        .open(metadata.persist_location.clone())
112        .await
113        .expect("location unavailable");
114
115    // We must load enough data in the timestamper to reclock all the requested frontiers
116    let mut remap_updates = vec![];
117    let mut remap_upper = as_of.clone();
118    let mut subscription = None;
119    for upper in resume_uppers.values() {
120        // TODO(petrosagg): this feels icky, we shouldn't have exceptions in frontier reasoning
121        // unless there is a good explanation as to why it is the case. It seems to me that this is
122        // because in various moments in ingestion we mix uppers and sinces and try to derive one
123        // from the other. Investigate if we could explicitly track natively timestamped
124        // since/uppers in the controller.
125        if upper.is_empty() {
126            continue;
127        }
128
129        while PartialOrder::less_than(&remap_upper, upper) {
130            let subscription = match subscription.as_mut() {
131                Some(subscription) => subscription,
132                None => {
133                    let read_handle = persist_client
134                        .open_leased_reader::<SourceData, (), IntoTime, StorageDiff>(
135                            metadata.remap_shard.clone().unwrap(),
136                            Arc::new(ingestion_description.desc.connection.timestamp_desc()),
137                            Arc::new(UnitSchema),
138                            Diagnostics {
139                                shard_name: ingestion_description.remap_collection_id.to_string(),
140                                handle_purpose: format!("reclock for {}", id),
141                            },
142                            false,
143                        )
144                        .await
145                        .expect("shard unavailable");
146
147                    let sub = read_handle
148                        .subscribe(as_of.clone())
149                        .await
150                        .expect("always valid to read at since");
151
152                    subscription.insert(sub)
153                }
154            };
155            for event in subscription.fetch_next().await {
156                match event {
157                    ListenEvent::Updates(updates) => {
158                        for ((k, v), t, d) in updates {
159                            let row: Row = k.expect("invalid binding").0.expect("invalid binding");
160                            let _v: () = v.expect("invalid binding");
161                            let from_ts = C::Time::decode_row(&row);
162                            remap_updates.push((from_ts, t, d));
163                        }
164                    }
165                    ListenEvent::Progress(f) => remap_upper = f,
166                }
167            }
168        }
169    }
170
171    remap_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
172
173    // The conversion of an IntoTime frontier to a FromTime frontier has the property that all
174    // messages that would be reclocked to times beyond the provided `IntoTime` frontier will be
175    // beyond the returned `FromTime` frontier. This can be used to compute a safe starting point
176    // to resume producing an `IntoTime` collection at a particular frontier.
177    let mut source_upper = MutableAntichain::new();
178    let mut source_upper_at_frontier = move |upper: &Antichain<IntoTime>| {
179        if PartialOrder::less_equal(upper, &as_of) {
180            Antichain::from_elem(Timestamp::minimum())
181        } else {
182            let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_equal(t));
183            source_upper.clear();
184            source_upper.update_iter(
185                remap_updates[0..idx]
186                    .iter()
187                    .map(|(from_time, _, diff)| (from_time.clone(), *diff)),
188            );
189            source_upper.frontier().to_owned()
190        }
191    };
192
193    let mut source_resume_uppers = BTreeMap::new();
194    for (id, upper) in resume_uppers {
195        let source_upper = source_upper_at_frontier(upper);
196        source_resume_uppers.insert(*id, source_upper);
197    }
198    source_resume_uppers
199}
200
201impl<T: Timestamp + TimestampManipulation + Lattice + Codec64 + Display + Sync>
202    AsyncStorageWorker<T>
203{
204    /// Creates a new [`AsyncStorageWorker`].
205    ///
206    /// IMPORTANT: The passed in `thread` is unparked when new responses
207    /// are added the response channel. It is important to not sleep the thread
208    /// that is reading from this via [`try_recv`](Self::try_recv) when
209    /// [`is_empty`](Self::is_empty) has returned `false`.
210    pub fn new(thread: Thread, persist_clients: Arc<PersistClientCache>) -> Self {
211        let (command_tx, mut command_rx) = mpsc::unbounded_channel();
212        let (response_tx, response_rx) = crossbeam_channel::unbounded();
213
214        let response_tx = ActivatingSender::new(response_tx, thread);
215
216        mz_ore::task::spawn(|| "AsyncStorageWorker", async move {
217            while let Some(command) = command_rx.recv().await {
218                match command {
219                    AsyncStorageWorkerCommand::UpdateIngestionFrontiers(
220                        id,
221                        ingestion_description,
222                    ) => {
223                        let mut resume_uppers = BTreeMap::new();
224
225                        let seen_remap_shard = ingestion_description
226                            .ingestion_metadata
227                            .remap_shard
228                            .expect("ingestions must have a remap shard");
229
230                        for (id, export) in ingestion_description.source_exports.iter() {
231                            // Explicit destructuring to force a compile error when the metadata change
232                            let CollectionMetadata {
233                                persist_location,
234                                remap_shard,
235                                data_shard,
236                                relation_desc,
237                                txns_shard,
238                            } = &export.storage_metadata;
239                            assert_eq!(
240                                txns_shard, &None,
241                                "source {} unexpectedly using txn-wal",
242                                id
243                            );
244                            let client = persist_clients
245                                .open(persist_location.clone())
246                                .await
247                                .expect("error creating persist client");
248
249                            let mut write_handle = client
250                                .open_writer::<SourceData, (), T, StorageDiff>(
251                                    *data_shard,
252                                    Arc::new(relation_desc.clone()),
253                                    Arc::new(UnitSchema),
254                                    Diagnostics {
255                                        shard_name: id.to_string(),
256                                        handle_purpose: format!("resumption data {}", id),
257                                    },
258                                )
259                                .await
260                                .unwrap();
261                            let upper = write_handle.fetch_recent_upper().await;
262                            let upper = match export.data_config.envelope {
263                                // The CdcV2 envelope must re-ingest everything since the Mz frontier does not have a relation to upstream timestamps.
264                                // TODO(petrosagg): move this reasoning to the controller
265                                SourceEnvelope::CdcV2 if upper.is_empty() => Antichain::new(),
266                                SourceEnvelope::CdcV2 => Antichain::from_elem(Timestamp::minimum()),
267                                _ => upper.clone(),
268                            };
269                            resume_uppers.insert(*id, upper);
270                            write_handle.expire().await;
271
272                            if let Some(remap_shard) = remap_shard {
273                                assert_eq!(
274                                    seen_remap_shard, *remap_shard,
275                                    "ingestion with multiple remap shards"
276                                );
277                            }
278                        }
279
280                        // Here we update the as-of frontier of the ingestion.
281                        //
282                        // The as-of frontier controls the frontier with which all inputs of the
283                        // ingestion dataflow will be advanced by. It is in our interest to set the
284                        // as-of froniter to the largest possible value, which will result in the
285                        // maximum amount of consolidation, which in turn results in the minimum
286                        // amount of memory required to hydrate.
287                        //
288                        // For each output `o` and for each input `i` of the ingestion the
289                        // controller guarantees that i.since < o.upper except when o.upper is
290                        // [T::minimum()]. Therefore the largest as-of for a particular output `o`
291                        // is `{ (t - 1).advance_by(i.since) | t in o.upper }`.
292                        //
293                        // To calculate the global as_of frontier we take the minimum of all those
294                        // per-output as-of frontiers.
295                        let client = persist_clients
296                            .open(
297                                ingestion_description
298                                    .ingestion_metadata
299                                    .persist_location
300                                    .clone(),
301                            )
302                            .await
303                            .expect("error creating persist client");
304                        let read_handle = client
305                            .open_leased_reader::<SourceData, (), T, StorageDiff>(
306                                seen_remap_shard,
307                                Arc::new(ingestion_description.desc.connection.timestamp_desc()),
308                                Arc::new(UnitSchema),
309                                Diagnostics {
310                                    shard_name: ingestion_description
311                                        .remap_collection_id
312                                        .to_string(),
313                                    handle_purpose: format!("resumption data for {}", id),
314                                },
315                                false,
316                            )
317                            .await
318                            .unwrap();
319                        let remap_since = read_handle.since().clone();
320                        mz_ore::task::spawn(move || "deferred_expire", async move {
321                            tokio::time::sleep(std::time::Duration::from_secs(300)).await;
322                            read_handle.expire().await;
323                        });
324                        let mut as_of = Antichain::new();
325                        for upper in resume_uppers.values() {
326                            for t in upper.elements() {
327                                let mut t_prime = t.step_back().unwrap_or_else(T::minimum);
328                                if !remap_since.is_empty() {
329                                    t_prime.advance_by(remap_since.borrow());
330                                    as_of.insert(t_prime);
331                                }
332                            }
333                        }
334
335                        /// Convenience function to convert `BTreeMap<GlobalId, Antichain<C>>` to
336                        /// `BTreeMap<GlobalId, Vec<Row>>`.
337                        fn to_vec_row<T: SourceTimestamp>(
338                            uppers: BTreeMap<GlobalId, Antichain<T>>,
339                        ) -> BTreeMap<GlobalId, Vec<Row>> {
340                            uppers
341                                .into_iter()
342                                .map(|(id, upper)| {
343                                    (id, upper.into_iter().map(|ts| ts.encode_row()).collect())
344                                })
345                                .collect()
346                        }
347
348                        // Create a specialized description to be able to call the generic method
349                        let source_resume_uppers = match ingestion_description.desc.connection {
350                            GenericSourceConnection::Kafka(_) => {
351                                let uppers = reclock_resume_uppers::<KafkaSourceConnection, _>(
352                                    &id,
353                                    &persist_clients,
354                                    &ingestion_description,
355                                    as_of.clone(),
356                                    &resume_uppers,
357                                )
358                                .await;
359                                to_vec_row(uppers)
360                            }
361                            GenericSourceConnection::Postgres(_) => {
362                                let uppers = reclock_resume_uppers::<PostgresSourceConnection, _>(
363                                    &id,
364                                    &persist_clients,
365                                    &ingestion_description,
366                                    as_of.clone(),
367                                    &resume_uppers,
368                                )
369                                .await;
370                                to_vec_row(uppers)
371                            }
372                            GenericSourceConnection::MySql(_) => {
373                                let uppers = reclock_resume_uppers::<MySqlSourceConnection, _>(
374                                    &id,
375                                    &persist_clients,
376                                    &ingestion_description,
377                                    as_of.clone(),
378                                    &resume_uppers,
379                                )
380                                .await;
381                                to_vec_row(uppers)
382                            }
383                            GenericSourceConnection::SqlServer(_) => {
384                                let uppers = reclock_resume_uppers::<SqlServerSource, _>(
385                                    &id,
386                                    &persist_clients,
387                                    &ingestion_description,
388                                    as_of.clone(),
389                                    &resume_uppers,
390                                )
391                                .await;
392                                to_vec_row(uppers)
393                            }
394                            GenericSourceConnection::LoadGenerator(_) => {
395                                let uppers =
396                                    reclock_resume_uppers::<LoadGeneratorSourceConnection, _>(
397                                        &id,
398                                        &persist_clients,
399                                        &ingestion_description,
400                                        as_of.clone(),
401                                        &resume_uppers,
402                                    )
403                                    .await;
404                                to_vec_row(uppers)
405                            }
406                        };
407
408                        let res = response_tx.send(
409                            AsyncStorageWorkerResponse::IngestionFrontiersUpdated {
410                                id,
411                                ingestion_description,
412                                as_of,
413                                resume_uppers,
414                                source_resume_uppers,
415                            },
416                        );
417
418                        if let Err(_err) = res {
419                            // Receiver must have hung up.
420                            break;
421                        }
422                    }
423                    AsyncStorageWorkerCommand::UpdateSinkFrontiers(id, mut description) => {
424                        let metadata = description.to_storage_metadata.clone();
425                        let client = persist_clients
426                            .open(metadata.persist_location.clone())
427                            .await
428                            .expect("error creating persist client");
429
430                        let mut write_handle = client
431                            .open_writer::<SourceData, (), T, StorageDiff>(
432                                metadata.data_shard,
433                                Arc::new(metadata.relation_desc),
434                                Arc::new(UnitSchema),
435                                Diagnostics {
436                                    shard_name: id.to_string(),
437                                    handle_purpose: format!("resumption data {}", id),
438                                },
439                            )
440                            .await
441                            .unwrap();
442                        // Choose an as-of frontier for this execution of the sink. If the write
443                        // frontier of the sink is strictly larger than its read hold, it must have
444                        // at least written out its snapshot, and we can skip reading it; otherwise
445                        // assume we may have to replay from the beginning.
446                        let upper = write_handle.fetch_recent_upper().await;
447                        let mut read_hold = Antichain::from_iter(
448                            upper
449                                .iter()
450                                .map(|t| t.step_back().unwrap_or_else(T::minimum)),
451                        );
452                        read_hold.join_assign(&description.as_of);
453                        description.with_snapshot = description.with_snapshot
454                            && !PartialOrder::less_than(&description.as_of, upper);
455                        description.as_of = read_hold;
456                        let res =
457                            response_tx.send(AsyncStorageWorkerResponse::ExportFrontiersUpdated {
458                                id,
459                                description,
460                            });
461
462                        if let Err(_err) = res {
463                            // Receiver must have hung up.
464                            break;
465                        }
466                    }
467                    AsyncStorageWorkerCommand::ForwardDropDataflow(id) => {
468                        if let Err(_) =
469                            response_tx.send(AsyncStorageWorkerResponse::DropDataflow(id))
470                        {
471                            // Receiver hang up
472                            break;
473                        }
474                    }
475                }
476            }
477            tracing::trace!("shutting down async storage worker task");
478        });
479
480        Self {
481            tx: command_tx,
482            rx: response_rx,
483        }
484    }
485
486    /// Updates the frontiers associated with the provided `IngestionDescription` to recent values.
487    /// Currently this will calculate a fresh as-of for the ingestion and a fresh resumption
488    /// frontier for each of the exports.
489    pub fn update_ingestion_frontiers(
490        &self,
491        id: GlobalId,
492        ingestion: IngestionDescription<CollectionMetadata>,
493    ) {
494        self.send(AsyncStorageWorkerCommand::UpdateIngestionFrontiers(
495            id, ingestion,
496        ))
497    }
498
499    /// Updates the frontiers associated with the provided `StorageSinkDesc` to recent values.
500    /// Currently this will calculate a fresh as-of for the ingestion.
501    pub fn update_sink_frontiers(
502        &self,
503        id: GlobalId,
504        sink: StorageSinkDesc<CollectionMetadata, T>,
505    ) {
506        self.send(AsyncStorageWorkerCommand::UpdateSinkFrontiers(id, sink))
507    }
508
509    /// Enqueue a drop dataflow in the async storage worker channel to ensure proper
510    /// ordering of creating and dropping data flows.
511    pub fn drop_dataflow(&self, id: GlobalId) {
512        self.send(AsyncStorageWorkerCommand::ForwardDropDataflow(id))
513    }
514
515    fn send(&self, cmd: AsyncStorageWorkerCommand<T>) {
516        self.tx
517            .send(cmd)
518            .expect("persist worker exited while its handle was alive")
519    }
520
521    /// Attempts to receive a message from the worker without blocking.
522    ///
523    /// This internally does a `try_recv` on a channel.
524    pub fn try_recv(
525        &self,
526    ) -> Result<AsyncStorageWorkerResponse<T>, crossbeam_channel::TryRecvError> {
527        self.rx.try_recv()
528    }
529
530    /// Returns `true` if there are currently no responses.
531    pub fn is_empty(&self) -> bool {
532        self.rx.is_empty()
533    }
534}
535
536/// Helper that makes sure that we always unpark the target thread when we send a
537/// message.
538struct ActivatingSender<T> {
539    tx: crossbeam_channel::Sender<T>,
540    thread: Thread,
541}
542
543impl<T> ActivatingSender<T> {
544    fn new(tx: crossbeam_channel::Sender<T>, thread: Thread) -> Self {
545        Self { tx, thread }
546    }
547
548    fn send(&self, message: T) -> Result<(), crossbeam_channel::SendError<T>> {
549        let res = self.tx.send(message);
550        self.thread.unpark();
551        res
552    }
553}