mz_persist_client/
write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Write capabilities and handles
11
12use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Semigroup;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::instrument;
23use mz_ore::task::RuntimeExt;
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::progress::{Antichain, Timestamp};
33use tokio::runtime::Handle;
34use tracing::{Instrument, debug_span, info, warn};
35use uuid::Uuid;
36
37use crate::batch::{
38    Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
39    BatchParts, ProtoBatch, validate_truncate_batch,
40};
41use crate::error::{InvalidUsage, UpperMismatch};
42use crate::fetch::{EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::{Schemas, check_data_version};
45use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
46use crate::internal::metrics::Metrics;
47use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
48use crate::read::ReadHandle;
49use crate::schema::PartMigration;
50use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
51
52pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
53    "persist_write_combine_inline_writes",
54    true,
55    "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
56);
57
58/// An opaque identifier for a writer of a persist durable TVC (aka shard).
59#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
60#[serde(try_from = "String", into = "String")]
61pub struct WriterId(pub(crate) [u8; 16]);
62
63impl std::fmt::Display for WriterId {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "w{}", Uuid::from_bytes(self.0))
66    }
67}
68
69impl std::fmt::Debug for WriterId {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(f, "WriterId({})", Uuid::from_bytes(self.0))
72    }
73}
74
75impl std::str::FromStr for WriterId {
76    type Err = String;
77
78    fn from_str(s: &str) -> Result<Self, Self::Err> {
79        parse_id('w', "WriterId", s).map(WriterId)
80    }
81}
82
83impl From<WriterId> for String {
84    fn from(writer_id: WriterId) -> Self {
85        writer_id.to_string()
86    }
87}
88
89impl TryFrom<String> for WriterId {
90    type Error = String;
91
92    fn try_from(s: String) -> Result<Self, Self::Error> {
93        s.parse()
94    }
95}
96
97impl WriterId {
98    pub(crate) fn new() -> Self {
99        WriterId(*Uuid::new_v4().as_bytes())
100    }
101}
102
103/// A "capability" granting the ability to apply updates to some shard at times
104/// greater or equal to `self.upper()`.
105///
106/// All async methods on ReadHandle retry for as long as they are able, but the
107/// returned [std::future::Future]s implement "cancel on drop" semantics. This
108/// means that callers can add a timeout using [tokio::time::timeout] or
109/// [tokio::time::timeout_at].
110///
111/// ```rust,no_run
112/// # let mut write: mz_persist_client::write::WriteHandle<String, String, u64, i64> = unimplemented!();
113/// # let timeout: std::time::Duration = unimplemented!();
114/// # async {
115/// tokio::time::timeout(timeout, write.fetch_recent_upper()).await
116/// # };
117/// ```
118#[derive(Debug)]
119pub struct WriteHandle<K: Codec, V: Codec, T, D> {
120    pub(crate) cfg: PersistConfig,
121    pub(crate) metrics: Arc<Metrics>,
122    pub(crate) machine: Machine<K, V, T, D>,
123    pub(crate) gc: GarbageCollector<K, V, T, D>,
124    pub(crate) compact: Option<Compactor<K, V, T, D>>,
125    pub(crate) blob: Arc<dyn Blob>,
126    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
127    pub(crate) writer_id: WriterId,
128    pub(crate) debug_state: HandleDebugState,
129    pub(crate) write_schemas: Schemas<K, V>,
130
131    pub(crate) upper: Antichain<T>,
132    expire_fn: Option<ExpireFn>,
133}
134
135impl<K, V, T, D> WriteHandle<K, V, T, D>
136where
137    K: Debug + Codec,
138    V: Debug + Codec,
139    T: Timestamp + Lattice + Codec64 + Sync,
140    D: Semigroup + Ord + Codec64 + Send + Sync,
141{
142    pub(crate) fn new(
143        cfg: PersistConfig,
144        metrics: Arc<Metrics>,
145        machine: Machine<K, V, T, D>,
146        gc: GarbageCollector<K, V, T, D>,
147        blob: Arc<dyn Blob>,
148        writer_id: WriterId,
149        purpose: &str,
150        write_schemas: Schemas<K, V>,
151    ) -> Self {
152        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
153        let compact = cfg.compaction_enabled.then(|| {
154            Compactor::new(
155                cfg.clone(),
156                Arc::clone(&metrics),
157                write_schemas.clone(),
158                gc.clone(),
159            )
160        });
161        let debug_state = HandleDebugState {
162            hostname: cfg.hostname.to_owned(),
163            purpose: purpose.to_owned(),
164        };
165        let upper = machine.applier.clone_upper();
166        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
167        WriteHandle {
168            cfg,
169            metrics,
170            machine,
171            gc,
172            compact,
173            blob,
174            isolated_runtime,
175            writer_id,
176            debug_state,
177            write_schemas,
178            upper,
179            expire_fn: Some(expire_fn),
180        }
181    }
182
183    /// Creates a [WriteHandle] for the same shard from an existing
184    /// [ReadHandle].
185    pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
186        Self::new(
187            read.cfg.clone(),
188            Arc::clone(&read.metrics),
189            read.machine.clone(),
190            read.gc.clone(),
191            Arc::clone(&read.blob),
192            WriterId::new(),
193            purpose,
194            read.read_schemas.clone(),
195        )
196    }
197
198    /// This handle's shard id.
199    pub fn shard_id(&self) -> ShardId {
200        self.machine.shard_id()
201    }
202
203    /// Returns the schema of this writer.
204    pub fn schema_id(&self) -> Option<SchemaId> {
205        self.write_schemas.id
206    }
207
208    /// A cached version of the shard-global `upper` frontier.
209    ///
210    /// This is the most recent upper discovered by this handle. It is
211    /// potentially more stale than [Self::shared_upper] but is lock-free and
212    /// allocation-free. This will always be less or equal to the shard-global
213    /// `upper`.
214    pub fn upper(&self) -> &Antichain<T> {
215        &self.upper
216    }
217
218    /// A less-stale cached version of the shard-global `upper` frontier.
219    ///
220    /// This is the most recently known upper for this shard process-wide, but
221    /// unlike [Self::upper] it requires a mutex and a clone. This will always be
222    /// less or equal to the shard-global `upper`.
223    pub fn shared_upper(&self) -> Antichain<T> {
224        self.machine.applier.clone_upper()
225    }
226
227    /// Fetches and returns a recent shard-global `upper`. Importantly, this operation is
228    /// linearized with write operations.
229    ///
230    /// This requires fetching the latest state from consensus and is therefore a potentially
231    /// expensive operation.
232    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
233    pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
234        // TODO: Do we even need to track self.upper on WriteHandle or could
235        // WriteHandle::upper just get the one out of machine?
236        self.machine
237            .applier
238            .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
239            .await;
240        &self.upper
241    }
242
243    /// Applies `updates` to this shard and downgrades this handle's upper to
244    /// `upper`.
245    ///
246    /// The innermost `Result` is `Ok` if the updates were successfully written.
247    /// If not, an `Upper` err containing the current writer upper is returned.
248    /// If that happens, we also update our local `upper` to match the current
249    /// upper. This is useful in cases where a timeout happens in between a
250    /// successful write and returning that to the client.
251    ///
252    /// In contrast to [Self::compare_and_append], multiple [WriteHandle]s may
253    /// be used concurrently to write to the same shard, but in this case, the
254    /// data being written must be identical (in the sense of "definite"-ness).
255    /// It's intended for replicated use by source ingestion, sinks, etc.
256    ///
257    /// All times in `updates` must be greater or equal to `lower` and not
258    /// greater or equal to `upper`. A `upper` of the empty antichain "finishes"
259    /// this shard, promising that no more data is ever incoming.
260    ///
261    /// `updates` may be empty, which allows for downgrading `upper` to
262    /// communicate progress. It is possible to call this with `upper` equal to
263    /// `self.upper()` and an empty `updates` (making the call a no-op).
264    ///
265    /// This uses a bounded amount of memory, even when `updates` is very large.
266    /// Individual records, however, should be small enough that we can
267    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
268    /// us.
269    ///
270    /// The clunky multi-level Result is to enable more obvious error handling
271    /// in the caller. See <http://sled.rs/errors.html> for details.
272    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
273    pub async fn append<SB, KB, VB, TB, DB, I>(
274        &mut self,
275        updates: I,
276        lower: Antichain<T>,
277        upper: Antichain<T>,
278    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
279    where
280        SB: Borrow<((KB, VB), TB, DB)>,
281        KB: Borrow<K>,
282        VB: Borrow<V>,
283        TB: Borrow<T>,
284        DB: Borrow<D>,
285        I: IntoIterator<Item = SB>,
286        D: Send + Sync,
287    {
288        let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
289        self.append_batch(batch, lower, upper).await
290    }
291
292    /// Applies `updates` to this shard and downgrades this handle's upper to
293    /// `new_upper` iff the current global upper of this shard is
294    /// `expected_upper`.
295    ///
296    /// The innermost `Result` is `Ok` if the updates were successfully written.
297    /// If not, an `Upper` err containing the current global upper is returned.
298    ///
299    /// In contrast to [Self::append], this linearizes mutations from all
300    /// writers. It's intended for use as an atomic primitive for timestamp
301    /// bindings, SQL tables, etc.
302    ///
303    /// All times in `updates` must be greater or equal to `expected_upper` and
304    /// not greater or equal to `new_upper`. A `new_upper` of the empty
305    /// antichain "finishes" this shard, promising that no more data is ever
306    /// incoming.
307    ///
308    /// `updates` may be empty, which allows for downgrading `upper` to
309    /// communicate progress. It is possible to heartbeat a writer lease by
310    /// calling this with `new_upper` equal to `self.upper()` and an empty
311    /// `updates` (making the call a no-op).
312    ///
313    /// This uses a bounded amount of memory, even when `updates` is very large.
314    /// Individual records, however, should be small enough that we can
315    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
316    /// us.
317    ///
318    /// The clunky multi-level Result is to enable more obvious error handling
319    /// in the caller. See <http://sled.rs/errors.html> for details.
320    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
321    pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
322        &mut self,
323        updates: I,
324        expected_upper: Antichain<T>,
325        new_upper: Antichain<T>,
326    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
327    where
328        SB: Borrow<((KB, VB), TB, DB)>,
329        KB: Borrow<K>,
330        VB: Borrow<V>,
331        TB: Borrow<T>,
332        DB: Borrow<D>,
333        I: IntoIterator<Item = SB>,
334        D: Send + Sync,
335    {
336        let mut batch = self
337            .batch(updates, expected_upper.clone(), new_upper.clone())
338            .await?;
339        match self
340            .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper)
341            .await
342        {
343            ok @ Ok(Ok(())) => ok,
344            err => {
345                // We cannot delete the batch in compare_and_append_batch()
346                // because the caller owns the batch and might want to retry
347                // with a different `expected_upper`. In this function, we
348                // control the batch, so we have to delete it.
349                batch.delete().await;
350                err
351            }
352        }
353    }
354
355    /// Appends the batch of updates to the shard and downgrades this handle's
356    /// upper to `upper`.
357    ///
358    /// The innermost `Result` is `Ok` if the updates were successfully written.
359    /// If not, an `Upper` err containing the current writer upper is returned.
360    /// If that happens, we also update our local `upper` to match the current
361    /// upper. This is useful in cases where a timeout happens in between a
362    /// successful write and returning that to the client.
363    ///
364    /// In contrast to [Self::compare_and_append_batch], multiple [WriteHandle]s
365    /// may be used concurrently to write to the same shard, but in this case,
366    /// the data being written must be identical (in the sense of
367    /// "definite"-ness). It's intended for replicated use by source ingestion,
368    /// sinks, etc.
369    ///
370    /// A `upper` of the empty antichain "finishes" this shard, promising that
371    /// no more data is ever incoming.
372    ///
373    /// The batch may be empty, which allows for downgrading `upper` to
374    /// communicate progress. It is possible to heartbeat a writer lease by
375    /// calling this with `upper` equal to `self.upper()` and an empty `updates`
376    /// (making the call a no-op).
377    ///
378    /// The clunky multi-level Result is to enable more obvious error handling
379    /// in the caller. See <http://sled.rs/errors.html> for details.
380    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
381    pub async fn append_batch(
382        &mut self,
383        mut batch: Batch<K, V, T, D>,
384        mut lower: Antichain<T>,
385        upper: Antichain<T>,
386    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
387    where
388        D: Send + Sync,
389    {
390        loop {
391            let res = self
392                .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone())
393                .await?;
394            match res {
395                Ok(()) => {
396                    self.upper = upper;
397                    return Ok(Ok(()));
398                }
399                Err(mismatch) => {
400                    // We tried to to a non-contiguous append, that won't work.
401                    if PartialOrder::less_than(&mismatch.current, &lower) {
402                        self.upper.clone_from(&mismatch.current);
403
404                        batch.delete().await;
405
406                        return Ok(Err(mismatch));
407                    } else if PartialOrder::less_than(&mismatch.current, &upper) {
408                        // Cut down the Description by advancing its lower to the current shard
409                        // upper and try again. IMPORTANT: We can only advance the lower, meaning
410                        // we cut updates away, we must not "extend" the batch by changing to a
411                        // lower that is not beyond the current lower. This invariant is checked by
412                        // the first if branch: if `!(current_upper < lower)` then it holds that
413                        // `lower <= current_upper`.
414                        lower = mismatch.current;
415                    } else {
416                        // We already have updates past this batch's upper, the append is a no-op.
417                        self.upper = mismatch.current;
418
419                        // Because we return a success result, the caller will
420                        // think that the batch was consumed or otherwise used,
421                        // so we have to delete it here.
422                        batch.delete().await;
423
424                        return Ok(Ok(()));
425                    }
426                }
427            }
428        }
429    }
430
431    /// Appends the batch of updates to the shard and downgrades this handle's
432    /// upper to `new_upper` iff the current global upper of this shard is
433    /// `expected_upper`.
434    ///
435    /// The innermost `Result` is `Ok` if the batch was successfully written. If
436    /// not, an `Upper` err containing the current global upper is returned.
437    ///
438    /// In contrast to [Self::append_batch], this linearizes mutations from all
439    /// writers. It's intended for use as an atomic primitive for timestamp
440    /// bindings, SQL tables, etc.
441    ///
442    /// A `new_upper` of the empty antichain "finishes" this shard, promising
443    /// that no more data is ever incoming.
444    ///
445    /// The batch may be empty, which allows for downgrading `upper` to
446    /// communicate progress. It is possible to heartbeat a writer lease by
447    /// calling this with `new_upper` equal to `self.upper()` and an empty
448    /// `updates` (making the call a no-op).
449    ///
450    /// IMPORTANT: In case of an erroneous result the caller is responsible for
451    /// the lifecycle of the `batch`. It can be deleted or it can be used to
452    /// retry with adjusted frontiers.
453    ///
454    /// The clunky multi-level Result is to enable more obvious error handling
455    /// in the caller. See <http://sled.rs/errors.html> for details.
456    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
457    pub async fn compare_and_append_batch(
458        &mut self,
459        batches: &mut [&mut Batch<K, V, T, D>],
460        expected_upper: Antichain<T>,
461        new_upper: Antichain<T>,
462    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
463    where
464        D: Send + Sync,
465    {
466        for batch in batches.iter() {
467            if self.machine.shard_id() != batch.shard_id() {
468                return Err(InvalidUsage::BatchNotFromThisShard {
469                    batch_shard: batch.shard_id(),
470                    handle_shard: self.machine.shard_id(),
471                });
472            }
473            check_data_version(&self.cfg.build_version, &batch.version);
474            if self.cfg.build_version > batch.version {
475                info!(
476                    shard_id =? self.machine.shard_id(),
477                    batch_version =? batch.version,
478                    writer_version =? self.cfg.build_version,
479                    "Appending batch from the past. This is fine but should be rare. \
480                    TODO: Error on very old versions once the leaked blob detector exists."
481                )
482            }
483        }
484
485        let lower = expected_upper.clone();
486        let upper = new_upper;
487        let since = Antichain::from_elem(T::minimum());
488        let desc = Description::new(lower, upper, since);
489
490        let mut received_inline_backpressure = false;
491        // Every hollow part must belong to some batch, so we can clean it up when the batch is dropped...
492        // but if we need to merge all our inline parts to a single run in S3, it's not correct to
493        // associate that with any of our individual input batches.
494        // At first, we'll try and put all the inline parts we receive into state... but if we
495        // get backpressured, we retry with this builder set to `Some`, put all our inline data into
496        // it, and ensure it's flushed out to S3 before including it in the batch.
497        let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
498        let maintenance = loop {
499            let any_batch_rewrite = batches
500                .iter()
501                .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
502            let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
503                (vec![], 0, vec![], vec![]);
504            let mut key_storage = None;
505            let mut val_storage = None;
506            for batch in batches.iter() {
507                let () = validate_truncate_batch(&batch.batch, &desc, any_batch_rewrite)?;
508                for (run_meta, run) in batch.batch.runs() {
509                    let start_index = parts.len();
510                    for part in run {
511                        if let (
512                            RunPart::Single(
513                                batch_part @ BatchPart::Inline {
514                                    updates,
515                                    ts_rewrite,
516                                    schema_id: _,
517                                    deprecated_schema_id: _,
518                                },
519                            ),
520                            Some((schema_cache, builder)),
521                        ) = (part, &mut inline_batch_builder)
522                        {
523                            let schema_migration = PartMigration::new(
524                                batch_part,
525                                self.write_schemas.clone(),
526                                schema_cache,
527                            )
528                            .await
529                            .expect("schemas for inline user part");
530
531                            let encoded_part = EncodedPart::from_inline(
532                                &*self.metrics,
533                                self.metrics.read.compaction.clone(),
534                                desc.clone(),
535                                updates,
536                                ts_rewrite.as_ref(),
537                            );
538                            let mut fetched_part = FetchedPart::new(
539                                Arc::clone(&self.metrics),
540                                encoded_part,
541                                schema_migration,
542                                FetchBatchFilter::Compaction {
543                                    since: desc.since().clone(),
544                                },
545                                false,
546                                PartDecodeFormat::Arrow,
547                                None,
548                            );
549
550                            while let Some(((k, v), t, d)) =
551                                fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
552                            {
553                                builder
554                                    .add(
555                                        &k.expect("decoded just-encoded key data"),
556                                        &v.expect("decoded just-encoded val data"),
557                                        &t,
558                                        &d,
559                                    )
560                                    .await
561                                    .expect("re-encoding just-decoded data");
562                            }
563                        } else {
564                            parts.push(part.clone())
565                        }
566                    }
567
568                    let end_index = parts.len();
569
570                    if start_index == end_index {
571                        continue;
572                    }
573
574                    // Mark the boundary if this is not the first run in the batch.
575                    if start_index != 0 {
576                        run_splits.push(start_index);
577                    }
578                    run_metas.push(run_meta.clone());
579                }
580                num_updates += batch.batch.len;
581            }
582
583            let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
584                let mut finished = builder
585                    .finish(desc.upper().clone())
586                    .await
587                    .expect("invalid usage");
588                let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
589                finished
590                    .flush_to_blob(
591                        &cfg,
592                        &self.metrics.inline.backpressure,
593                        &self.isolated_runtime,
594                        &self.write_schemas,
595                    )
596                    .await;
597                Some(finished)
598            } else {
599                None
600            };
601
602            if let Some(batch) = &flushed_inline_batch {
603                for (run_meta, run) in batch.batch.runs() {
604                    assert!(run.len() > 0);
605                    let start_index = parts.len();
606                    if start_index != 0 {
607                        run_splits.push(start_index);
608                    }
609                    run_metas.push(run_meta.clone());
610                    parts.extend(run.iter().cloned())
611                }
612            }
613
614            let combined_batch =
615                HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
616            let heartbeat_timestamp = (self.cfg.now)();
617            let res = self
618                .machine
619                .compare_and_append(
620                    &combined_batch,
621                    &self.writer_id,
622                    &self.debug_state,
623                    heartbeat_timestamp,
624                )
625                .await;
626
627            match res {
628                CompareAndAppendRes::Success(_seqno, maintenance) => {
629                    self.upper.clone_from(desc.upper());
630                    for batch in batches.iter_mut() {
631                        batch.mark_consumed();
632                    }
633                    if let Some(batch) = &mut flushed_inline_batch {
634                        batch.mark_consumed();
635                    }
636                    break maintenance;
637                }
638                CompareAndAppendRes::InvalidUsage(invalid_usage) => {
639                    if let Some(batch) = flushed_inline_batch.take() {
640                        batch.delete().await;
641                    }
642                    return Err(invalid_usage);
643                }
644                CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
645                    if let Some(batch) = flushed_inline_batch.take() {
646                        batch.delete().await;
647                    }
648                    // We tried to to a compare_and_append with the wrong expected upper, that
649                    // won't work. Update the cached upper to the current upper.
650                    self.upper.clone_from(&current_upper);
651                    return Ok(Err(UpperMismatch {
652                        current: current_upper,
653                        expected: expected_upper,
654                    }));
655                }
656                CompareAndAppendRes::InlineBackpressure => {
657                    // We tried to write an inline part, but there was already
658                    // too much in state. Flush it out to s3 and try again.
659                    assert_eq!(received_inline_backpressure, false);
660                    received_inline_backpressure = true;
661                    if COMBINE_INLINE_WRITES.get(&self.cfg) {
662                        inline_batch_builder = Some((
663                            self.machine.applier.schema_cache(),
664                            self.builder(desc.lower().clone()),
665                        ));
666                        continue;
667                    }
668
669                    let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
670                    // We could have a large number of inline parts (imagine the
671                    // sharded persist_sink), do this flushing concurrently.
672                    let flush_batches = batches
673                        .iter_mut()
674                        .map(|batch| async {
675                            batch
676                                .flush_to_blob(
677                                    &cfg,
678                                    &self.metrics.inline.backpressure,
679                                    &self.isolated_runtime,
680                                    &self.write_schemas,
681                                )
682                                .await
683                        })
684                        .collect::<FuturesUnordered<_>>();
685                    let () = flush_batches.collect::<()>().await;
686
687                    for batch in batches.iter() {
688                        assert_eq!(batch.batch.inline_bytes(), 0);
689                    }
690
691                    continue;
692                }
693            }
694        };
695
696        maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
697
698        Ok(Ok(()))
699    }
700
701    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
702    /// to append it to this shard.
703    pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
704        let shard_id: ShardId = batch
705            .shard_id
706            .into_rust()
707            .expect("valid transmittable batch");
708        assert_eq!(shard_id, self.machine.shard_id());
709
710        let ret = Batch {
711            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
712            metrics: Arc::clone(&self.metrics),
713            shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
714            version: Version::parse(&batch.version).expect("valid transmittable batch"),
715            batch: batch
716                .batch
717                .into_rust_if_some("ProtoBatch::batch")
718                .expect("valid transmittable batch"),
719            blob: Arc::clone(&self.blob),
720            _phantom: std::marker::PhantomData,
721        };
722        assert_eq!(ret.shard_id(), self.machine.shard_id());
723        ret
724    }
725
726    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
727    /// blob storage which can then be appended to this shard using
728    /// [Self::compare_and_append_batch] or [Self::append_batch].
729    ///
730    /// It is correct to create an empty batch, which allows for downgrading
731    /// `upper` to communicate progress. (see [Self::compare_and_append_batch]
732    /// or [Self::append_batch])
733    ///
734    /// The builder uses a bounded amount of memory, even when the number of
735    /// updates is very large. Individual records, however, should be small
736    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
737    /// O(MB) come talk to us.
738    pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
739        let cfg = CompactConfig::new(&self.cfg, self.shard_id());
740        let parts = if let Some(max_runs) = cfg.batch.max_runs {
741            BatchParts::new_compacting::<K, V, D>(
742                cfg,
743                Description::new(
744                    lower.clone(),
745                    Antichain::new(),
746                    Antichain::from_elem(T::minimum()),
747                ),
748                max_runs,
749                Arc::clone(&self.metrics),
750                Arc::clone(&self.machine.applier.shard_metrics),
751                self.shard_id(),
752                Arc::clone(&self.blob),
753                Arc::clone(&self.isolated_runtime),
754                &self.metrics.user,
755                self.write_schemas.clone(),
756            )
757        } else {
758            BatchParts::new_ordered(
759                cfg.batch,
760                RunOrder::Unordered,
761                Arc::clone(&self.metrics),
762                Arc::clone(&self.machine.applier.shard_metrics),
763                self.shard_id(),
764                Arc::clone(&self.blob),
765                Arc::clone(&self.isolated_runtime),
766                &self.metrics.user,
767            )
768        };
769        let builder = BatchBuilderInternal::new(
770            BatchBuilderConfig::new(&self.cfg, self.shard_id()),
771            parts,
772            Arc::clone(&self.metrics),
773            self.write_schemas.clone(),
774            Arc::clone(&self.blob),
775            self.machine.shard_id().clone(),
776            self.cfg.build_version.clone(),
777        );
778        BatchBuilder::new(
779            builder,
780            Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
781        )
782    }
783
784    /// Uploads the given `updates` as one `Batch` to the blob store and returns
785    /// a handle to the batch.
786    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
787    pub async fn batch<SB, KB, VB, TB, DB, I>(
788        &mut self,
789        updates: I,
790        lower: Antichain<T>,
791        upper: Antichain<T>,
792    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
793    where
794        SB: Borrow<((KB, VB), TB, DB)>,
795        KB: Borrow<K>,
796        VB: Borrow<V>,
797        TB: Borrow<T>,
798        DB: Borrow<D>,
799        I: IntoIterator<Item = SB>,
800    {
801        let iter = updates.into_iter();
802
803        let mut builder = self.builder(lower.clone());
804
805        for update in iter {
806            let ((k, v), t, d) = update.borrow();
807            let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
808            match builder.add(k, v, t, d).await {
809                Ok(Added::Record | Added::RecordAndParts) => (),
810                Err(invalid_usage) => return Err(invalid_usage),
811            }
812        }
813
814        builder.finish(upper.clone()).await
815    }
816
817    /// Blocks until the given `frontier` is less than the upper of the shard.
818    pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
819        let mut watch = self.machine.applier.watch();
820        let batch = self
821            .machine
822            .next_listen_batch(frontier, &mut watch, None, None)
823            .await;
824        if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
825            self.upper.clone_from(batch.desc.upper());
826        }
827        assert!(PartialOrder::less_than(frontier, &self.upper));
828    }
829
830    /// Politely expires this writer, releasing any associated state.
831    ///
832    /// There is a best-effort impl in Drop to expire a writer that wasn't
833    /// explictly expired with this method. When possible, explicit expiry is
834    /// still preferred because the Drop one is best effort and is dependant on
835    /// a tokio [Handle] being available in the TLC at the time of drop (which
836    /// is a bit subtle). Also, explicit expiry allows for control over when it
837    /// happens.
838    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
839    pub async fn expire(mut self) {
840        let Some(expire_fn) = self.expire_fn.take() else {
841            return;
842        };
843        expire_fn.0().await;
844    }
845
846    fn expire_fn(
847        machine: Machine<K, V, T, D>,
848        gc: GarbageCollector<K, V, T, D>,
849        writer_id: WriterId,
850    ) -> ExpireFn {
851        ExpireFn(Box::new(move || {
852            Box::pin(async move {
853                let (_, maintenance) = machine.expire_writer(&writer_id).await;
854                maintenance.start_performing(&machine, &gc);
855            })
856        }))
857    }
858
859    /// Test helper for an [Self::append] call that is expected to succeed.
860    #[cfg(test)]
861    #[track_caller]
862    pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
863    where
864        L: Into<Antichain<T>>,
865        U: Into<Antichain<T>>,
866        D: Send + Sync,
867    {
868        self.append(updates.iter(), lower.into(), new_upper.into())
869            .await
870            .expect("invalid usage")
871            .expect("unexpected upper");
872    }
873
874    /// Test helper for a [Self::compare_and_append] call that is expected to
875    /// succeed.
876    #[cfg(test)]
877    #[track_caller]
878    pub async fn expect_compare_and_append(
879        &mut self,
880        updates: &[((K, V), T, D)],
881        expected_upper: T,
882        new_upper: T,
883    ) where
884        D: Send + Sync,
885    {
886        self.compare_and_append(
887            updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
888            Antichain::from_elem(expected_upper),
889            Antichain::from_elem(new_upper),
890        )
891        .await
892        .expect("invalid usage")
893        .expect("unexpected upper")
894    }
895
896    /// Test helper for a [Self::compare_and_append_batch] call that is expected
897    /// to succeed.
898    #[cfg(test)]
899    #[track_caller]
900    pub async fn expect_compare_and_append_batch(
901        &mut self,
902        batches: &mut [&mut Batch<K, V, T, D>],
903        expected_upper: T,
904        new_upper: T,
905    ) {
906        self.compare_and_append_batch(
907            batches,
908            Antichain::from_elem(expected_upper),
909            Antichain::from_elem(new_upper),
910        )
911        .await
912        .expect("invalid usage")
913        .expect("unexpected upper")
914    }
915
916    /// Test helper for an [Self::append] call that is expected to succeed.
917    #[cfg(test)]
918    #[track_caller]
919    pub async fn expect_batch(
920        &mut self,
921        updates: &[((K, V), T, D)],
922        lower: T,
923        upper: T,
924    ) -> Batch<K, V, T, D> {
925        self.batch(
926            updates.iter(),
927            Antichain::from_elem(lower),
928            Antichain::from_elem(upper),
929        )
930        .await
931        .expect("invalid usage")
932    }
933}
934
935impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
936    fn drop(&mut self) {
937        let Some(expire_fn) = self.expire_fn.take() else {
938            return;
939        };
940        let handle = match Handle::try_current() {
941            Ok(x) => x,
942            Err(_) => {
943                warn!(
944                    "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
945                    self.writer_id
946                );
947                return;
948            }
949        };
950        // Spawn a best-effort task to expire this write handle. It's fine if
951        // this doesn't run to completion, we'd just have to wait out the lease
952        // before the shard-global since is unblocked.
953        //
954        // Intentionally create the span outside the task to set the parent.
955        let expire_span = debug_span!("drop::expire");
956        handle.spawn_named(
957            || format!("WriteHandle::expire ({})", self.writer_id),
958            expire_fn.0().instrument(expire_span),
959        );
960    }
961}
962
963#[cfg(test)]
964mod tests {
965    use std::str::FromStr;
966    use std::sync::mpsc;
967
968    use differential_dataflow::consolidation::consolidate_updates;
969    use futures_util::FutureExt;
970    use mz_dyncfg::ConfigUpdates;
971    use mz_ore::collections::CollectionExt;
972    use mz_ore::task;
973    use serde_json::json;
974
975    use crate::cache::PersistClientCache;
976    use crate::tests::{all_ok, new_test_client};
977    use crate::{PersistLocation, ShardId};
978
979    use super::*;
980
981    #[mz_persist_proc::test(tokio::test)]
982    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
983    async fn empty_batches(dyncfgs: ConfigUpdates) {
984        let data = [
985            (("1".to_owned(), "one".to_owned()), 1, 1),
986            (("2".to_owned(), "two".to_owned()), 2, 1),
987            (("3".to_owned(), "three".to_owned()), 3, 1),
988        ];
989
990        let (mut write, _) = new_test_client(&dyncfgs)
991            .await
992            .expect_open::<String, String, u64, i64>(ShardId::new())
993            .await;
994        let blob = Arc::clone(&write.blob);
995
996        // Write an initial batch.
997        let mut upper = 3;
998        write.expect_append(&data[..2], vec![0], vec![upper]).await;
999
1000        // Write a bunch of empty batches. This shouldn't write blobs, so the count should stay the same.
1001        let mut count_before = 0;
1002        blob.list_keys_and_metadata("", &mut |_| {
1003            count_before += 1;
1004        })
1005        .await
1006        .expect("list_keys failed");
1007        for _ in 0..5 {
1008            let new_upper = upper + 1;
1009            write.expect_compare_and_append(&[], upper, new_upper).await;
1010            upper = new_upper;
1011        }
1012        let mut count_after = 0;
1013        blob.list_keys_and_metadata("", &mut |_| {
1014            count_after += 1;
1015        })
1016        .await
1017        .expect("list_keys failed");
1018        assert_eq!(count_after, count_before);
1019    }
1020
1021    #[mz_persist_proc::test(tokio::test)]
1022    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1023    async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1024        let data0 = vec![
1025            (("1".to_owned(), "one".to_owned()), 1, 1),
1026            (("2".to_owned(), "two".to_owned()), 2, 1),
1027            (("4".to_owned(), "four".to_owned()), 4, 1),
1028        ];
1029        let data1 = vec![
1030            (("1".to_owned(), "one".to_owned()), 1, 1),
1031            (("2".to_owned(), "two".to_owned()), 2, 1),
1032            (("3".to_owned(), "three".to_owned()), 3, 1),
1033        ];
1034
1035        let (mut write, mut read) = new_test_client(&dyncfgs)
1036            .await
1037            .expect_open::<String, String, u64, i64>(ShardId::new())
1038            .await;
1039
1040        let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1041        let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1042
1043        write
1044            .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1045            .await;
1046
1047        let batch = write
1048            .machine
1049            .snapshot(&Antichain::from_elem(3))
1050            .await
1051            .expect("just wrote this")
1052            .into_element();
1053
1054        assert!(batch.runs().count() >= 2);
1055
1056        let expected = vec![
1057            (("1".to_owned(), "one".to_owned()), 1, 2),
1058            (("2".to_owned(), "two".to_owned()), 2, 2),
1059            (("3".to_owned(), "three".to_owned()), 3, 1),
1060        ];
1061        let mut actual = read.expect_snapshot_and_fetch(3).await;
1062        consolidate_updates(&mut actual);
1063        assert_eq!(actual, all_ok(&expected, 3));
1064    }
1065
1066    #[mz_ore::test]
1067    fn writer_id_human_readable_serde() {
1068        #[derive(Debug, Serialize, Deserialize)]
1069        struct Container {
1070            writer_id: WriterId,
1071        }
1072
1073        // roundtrip through json
1074        let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1075        assert_eq!(
1076            id,
1077            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1078                .expect("deserializable")
1079        );
1080
1081        // deserialize a serialized string directly
1082        assert_eq!(
1083            id,
1084            serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1085                .expect("deserializable")
1086        );
1087
1088        // roundtrip id through a container type
1089        let json = json!({ "writer_id": id });
1090        assert_eq!(
1091            "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1092            &json.to_string()
1093        );
1094        let container: Container = serde_json::from_value(json).expect("deserializable");
1095        assert_eq!(container.writer_id, id);
1096    }
1097
1098    #[mz_persist_proc::test(tokio::test)]
1099    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1100    async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1101        let data = vec![
1102            (("1".to_owned(), "one".to_owned()), 1, 1),
1103            (("2".to_owned(), "two".to_owned()), 2, 1),
1104            (("3".to_owned(), "three".to_owned()), 3, 1),
1105        ];
1106
1107        let (mut write, mut read) = new_test_client(&dyncfgs)
1108            .await
1109            .expect_open::<String, String, u64, i64>(ShardId::new())
1110            .await;
1111
1112        // This test is a bit more complex than it should be. It would be easier
1113        // if we could just compare the rehydrated batch to the original batch.
1114        // But a) turning a batch into a hollow batch consumes it, and b) Batch
1115        // doesn't have Eq/PartialEq.
1116        let batch = write.expect_batch(&data, 0, 4).await;
1117        let hollow_batch = batch.into_transmittable_batch();
1118        let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1119
1120        write
1121            .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1122            .await;
1123
1124        let expected = vec![
1125            (("1".to_owned(), "one".to_owned()), 1, 1),
1126            (("2".to_owned(), "two".to_owned()), 2, 1),
1127            (("3".to_owned(), "three".to_owned()), 3, 1),
1128        ];
1129        let mut actual = read.expect_snapshot_and_fetch(3).await;
1130        consolidate_updates(&mut actual);
1131        assert_eq!(actual, all_ok(&expected, 3));
1132    }
1133
1134    #[mz_persist_proc::test(tokio::test)]
1135    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1136    async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1137        let client = new_test_client(&dyncfgs).await;
1138        let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1139        let five = Antichain::from_elem(5);
1140
1141        // Upper is not past 5.
1142        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1143
1144        // Upper is still not past 5.
1145        write
1146            .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1147            .await;
1148        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1149
1150        // Upper is past 5.
1151        write
1152            .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1153            .await;
1154        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1155        assert_eq!(write.upper(), &Antichain::from_elem(7));
1156
1157        // Waiting for previous uppers does not regress the handle's cached
1158        // upper.
1159        assert_eq!(
1160            write
1161                .wait_for_upper_past(&Antichain::from_elem(2))
1162                .now_or_never(),
1163            Some(())
1164        );
1165        assert_eq!(write.upper(), &Antichain::from_elem(7));
1166    }
1167
1168    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1169    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1170    async fn fetch_recent_upper_linearized() {
1171        type Timestamp = u64;
1172        let max_upper = 1000;
1173
1174        let shard_id = ShardId::new();
1175        let mut clients = PersistClientCache::new_no_metrics();
1176        let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1177        let (mut upper_writer, _) = upper_writer_client
1178            .expect_open::<(), (), Timestamp, i64>(shard_id)
1179            .await;
1180        // Clear the state cache between each client to maximally disconnect
1181        // them from each other.
1182        clients.clear_state_cache();
1183        let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1184        let (mut upper_reader, _) = upper_reader_client
1185            .expect_open::<(), (), Timestamp, i64>(shard_id)
1186            .await;
1187        let (tx, rx) = mpsc::channel();
1188
1189        let task = task::spawn(|| "upper-reader", async move {
1190            let mut upper = Timestamp::MIN;
1191
1192            while upper < max_upper {
1193                while let Ok(new_upper) = rx.try_recv() {
1194                    upper = new_upper;
1195                }
1196
1197                let recent_upper = upper_reader
1198                    .fetch_recent_upper()
1199                    .await
1200                    .as_option()
1201                    .cloned()
1202                    .expect("u64 is totally ordered and the shard is not finalized");
1203                assert!(
1204                    recent_upper >= upper,
1205                    "recent upper {recent_upper:?} is less than known upper {upper:?}"
1206                );
1207            }
1208        });
1209
1210        for upper in Timestamp::MIN..max_upper {
1211            let next_upper = upper + 1;
1212            upper_writer
1213                .expect_compare_and_append(&[], upper, next_upper)
1214                .await;
1215            tx.send(next_upper).expect("send failed");
1216        }
1217
1218        task.await.expect("await failed");
1219    }
1220}