mz_persist_client/
schema.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Persist shard schema information.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::{Arc, RwLock};
15use std::time::Instant;
16
17use differential_dataflow::difference::Semigroup;
18use differential_dataflow::lattice::Lattice;
19use mz_ore::cast::CastFrom;
20use mz_persist_types::columnar::data_type;
21use mz_persist_types::schema::{Migration, SchemaId, backward_compatible};
22use mz_persist_types::{Codec, Codec64};
23use timely::progress::Timestamp;
24
25use crate::internal::apply::Applier;
26use crate::internal::encoding::Schemas;
27use crate::internal::metrics::{SchemaCacheMetrics, SchemaMetrics};
28use crate::internal::state::{BatchPart, EncodedSchemas};
29
30/// The result returned by [crate::PersistClient::compare_and_evolve_schema].
31#[derive(Debug)]
32#[cfg_attr(test, derive(PartialEq))]
33pub enum CaESchema<K: Codec, V: Codec> {
34    /// The schema was successfully evolved and registered with the included id.
35    Ok(SchemaId),
36    /// The schema was not compatible with previously registered schemas.
37    Incompatible,
38    /// The `expected` SchemaId did not match reality. The current one is
39    /// included for easy of retry.
40    ExpectedMismatch {
41        /// The current schema id.
42        schema_id: SchemaId,
43        /// The key schema at this id.
44        key: K::Schema,
45        /// The val schema at this id.
46        val: V::Schema,
47    },
48}
49
50/// A cache of decoded schemas and schema migrations.
51///
52/// The decoded schemas are a cache of the registry in state, and so are shared
53/// process-wide.
54///
55/// On the other hand, the migrations have an N^2 problem and so are per-handle.
56/// This also seems reasonable because for any given write handle, the write
57/// schema will be the same for all migration entries, and ditto for read handle
58/// and read schema.
59#[derive(Debug)]
60pub(crate) struct SchemaCache<K: Codec, V: Codec, T, D> {
61    maps: Arc<SchemaCacheMaps<K, V>>,
62    applier: Applier<K, V, T, D>,
63    key_migration_by_ids: MigrationCacheMap,
64    val_migration_by_ids: MigrationCacheMap,
65}
66
67impl<K: Codec, V: Codec, T: Clone, D> Clone for SchemaCache<K, V, T, D> {
68    fn clone(&self) -> Self {
69        Self {
70            maps: Arc::clone(&self.maps),
71            applier: self.applier.clone(),
72            key_migration_by_ids: self.key_migration_by_ids.clone(),
73            val_migration_by_ids: self.val_migration_by_ids.clone(),
74        }
75    }
76}
77
78impl<K: Codec, V: Codec, T, D> Drop for SchemaCache<K, V, T, D> {
79    fn drop(&mut self) {
80        let dropped = u64::cast_from(
81            self.key_migration_by_ids.by_ids.len() + self.val_migration_by_ids.by_ids.len(),
82        );
83        self.applier
84            .metrics
85            .schema
86            .cache_migration
87            .dropped_count
88            .inc_by(dropped);
89    }
90}
91
92impl<K, V, T, D> SchemaCache<K, V, T, D>
93where
94    K: Debug + Codec,
95    V: Debug + Codec,
96    T: Timestamp + Lattice + Codec64 + Sync,
97    D: Semigroup + Codec64,
98{
99    pub fn new(maps: Arc<SchemaCacheMaps<K, V>>, applier: Applier<K, V, T, D>) -> Self {
100        let key_migration_by_ids = MigrationCacheMap {
101            metrics: applier.metrics.schema.cache_migration.clone(),
102            by_ids: BTreeMap::new(),
103        };
104        let val_migration_by_ids = MigrationCacheMap {
105            metrics: applier.metrics.schema.cache_migration.clone(),
106            by_ids: BTreeMap::new(),
107        };
108        SchemaCache {
109            maps,
110            applier,
111            key_migration_by_ids,
112            val_migration_by_ids,
113        }
114    }
115
116    async fn schemas(&self, id: &SchemaId) -> Option<Schemas<K, V>> {
117        let key = self
118            .get_or_try_init(&self.maps.key_by_id, id, |schemas| {
119                self.maps.key_by_id.metrics.computed_count.inc();
120                schemas.get(id).map(|x| K::decode_schema(&x.key))
121            })
122            .await?;
123        let val = self
124            .get_or_try_init(&self.maps.val_by_id, id, |schemas| {
125                self.maps.val_by_id.metrics.computed_count.inc();
126                schemas.get(id).map(|x| V::decode_schema(&x.val))
127            })
128            .await?;
129        Some(Schemas {
130            id: Some(*id),
131            key,
132            val,
133        })
134    }
135
136    fn key_migration(
137        &mut self,
138        write: &Schemas<K, V>,
139        read: &Schemas<K, V>,
140    ) -> Option<Arc<Migration>> {
141        let migration_fn = || Self::migration::<K>(&write.key, &read.key);
142        let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
143            // TODO: Annoying to cache this because we're missing an id. This
144            // will probably require some sort of refactor to fix so punting for
145            // now.
146            self.key_migration_by_ids.metrics.computed_count.inc();
147            return migration_fn().map(Arc::new);
148        };
149        self.key_migration_by_ids
150            .get_or_try_insert(write_id, read_id, migration_fn)
151    }
152
153    fn val_migration(
154        &mut self,
155        write: &Schemas<K, V>,
156        read: &Schemas<K, V>,
157    ) -> Option<Arc<Migration>> {
158        let migration_fn = || Self::migration::<V>(&write.val, &read.val);
159        let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
160            // TODO: Annoying to cache this because we're missing an id. This
161            // will probably require some sort of refactor to fix so punting for
162            // now.
163            self.val_migration_by_ids.metrics.computed_count.inc();
164            return migration_fn().map(Arc::new);
165        };
166        self.val_migration_by_ids
167            .get_or_try_insert(write_id, read_id, migration_fn)
168    }
169
170    fn migration<C: Codec>(write: &C::Schema, read: &C::Schema) -> Option<Migration> {
171        let write_dt = data_type::<C>(write).expect("valid schema");
172        let read_dt = data_type::<C>(read).expect("valid schema");
173        backward_compatible(&write_dt, &read_dt)
174    }
175
176    async fn get_or_try_init<MK: Clone + Ord, MV: PartialEq + Debug>(
177        &self,
178        map: &SchemaCacheMap<MK, MV>,
179        key: &MK,
180        f: impl Fn(&BTreeMap<SchemaId, EncodedSchemas>) -> Option<MV>,
181    ) -> Option<Arc<MV>> {
182        let ret = map.get_or_try_init(key, || {
183            self.applier
184                .schemas(|seqno, schemas| f(schemas).ok_or(seqno))
185        });
186        let seqno = match ret {
187            Ok(ret) => return Some(ret),
188            Err(seqno) => seqno,
189        };
190        self.applier.metrics.schema.cache_fetch_state_count.inc();
191        self.applier.fetch_and_update_state(Some(seqno)).await;
192        map.get_or_try_init(key, || {
193            self.applier
194                .schemas(|seqno, schemas| f(schemas).ok_or(seqno))
195        })
196        .ok()
197    }
198}
199
200#[derive(Debug)]
201pub(crate) struct SchemaCacheMaps<K: Codec, V: Codec> {
202    key_by_id: SchemaCacheMap<SchemaId, K::Schema>,
203    val_by_id: SchemaCacheMap<SchemaId, V::Schema>,
204}
205
206impl<K: Codec, V: Codec> SchemaCacheMaps<K, V> {
207    pub(crate) fn new(metrics: &SchemaMetrics) -> Self {
208        Self {
209            key_by_id: SchemaCacheMap {
210                metrics: metrics.cache_schema.clone(),
211                map: RwLock::new(BTreeMap::new()),
212            },
213            val_by_id: SchemaCacheMap {
214                metrics: metrics.cache_schema.clone(),
215                map: RwLock::new(BTreeMap::new()),
216            },
217        }
218    }
219}
220
221#[derive(Debug)]
222struct SchemaCacheMap<I, S> {
223    metrics: SchemaCacheMetrics,
224    map: RwLock<BTreeMap<I, Arc<S>>>,
225}
226
227impl<I: Clone + Ord, S: PartialEq + Debug> SchemaCacheMap<I, S> {
228    fn get_or_try_init<E>(
229        &self,
230        id: &I,
231        state_fn: impl FnOnce() -> Result<S, E>,
232    ) -> Result<Arc<S>, E> {
233        // First see if we have the value cached.
234        {
235            let map = self.map.read().expect("lock");
236            if let Some(ret) = map.get(id).map(Arc::clone) {
237                self.metrics.cached_count.inc();
238                return Ok(ret);
239            }
240        }
241        // If not, see if we can get the value from current state.
242        let ret = state_fn().map(Arc::new);
243        if let Ok(val) = ret.as_ref() {
244            let mut map = self.map.write().expect("lock");
245            // If any answers got written in the meantime, they should be the
246            // same, so just overwrite
247            let prev = map.insert(id.clone(), Arc::clone(val));
248            match prev {
249                Some(prev) => debug_assert_eq!(*val, prev),
250                None => self.metrics.added_count.inc(),
251            }
252        } else {
253            self.metrics.unavailable_count.inc();
254        }
255        ret
256    }
257}
258
259impl<I, K> Drop for SchemaCacheMap<I, K> {
260    fn drop(&mut self) {
261        let map = self.map.read().expect("lock");
262        self.metrics.dropped_count.inc_by(u64::cast_from(map.len()));
263    }
264}
265
266#[derive(Debug, Clone)]
267struct MigrationCacheMap {
268    metrics: SchemaCacheMetrics,
269    by_ids: BTreeMap<(SchemaId, SchemaId), Arc<Migration>>,
270}
271
272impl MigrationCacheMap {
273    fn get_or_try_insert(
274        &mut self,
275        write_id: SchemaId,
276        read_id: SchemaId,
277        migration_fn: impl FnOnce() -> Option<Migration>,
278    ) -> Option<Arc<Migration>> {
279        if let Some(migration) = self.by_ids.get(&(write_id, read_id)) {
280            self.metrics.cached_count.inc();
281            return Some(Arc::clone(migration));
282        };
283        self.metrics.computed_count.inc();
284        let migration = migration_fn().map(Arc::new);
285        if let Some(migration) = migration.as_ref() {
286            self.metrics.added_count.inc();
287            // We just looked this up above and we've got mutable access, so no
288            // race issues.
289            self.by_ids
290                .insert((write_id, read_id), Arc::clone(migration));
291        } else {
292            self.metrics.unavailable_count.inc();
293        }
294        migration
295    }
296}
297
298#[derive(Debug)]
299pub(crate) enum PartMigration<K: Codec, V: Codec> {
300    /// No-op!
301    SameSchema { both: Schemas<K, V> },
302    /// We don't have a schema id for write schema.
303    Schemaless { read: Schemas<K, V> },
304    /// We have both write and read schemas, and they don't match.
305    Either {
306        write: Schemas<K, V>,
307        read: Schemas<K, V>,
308        key_migration: Arc<Migration>,
309        val_migration: Arc<Migration>,
310    },
311}
312
313impl<K: Codec, V: Codec> Clone for PartMigration<K, V> {
314    fn clone(&self) -> Self {
315        match self {
316            Self::SameSchema { both } => Self::SameSchema { both: both.clone() },
317            Self::Schemaless { read } => Self::Schemaless { read: read.clone() },
318            Self::Either {
319                write,
320                read,
321                key_migration,
322                val_migration,
323            } => Self::Either {
324                write: write.clone(),
325                read: read.clone(),
326                key_migration: Arc::clone(key_migration),
327                val_migration: Arc::clone(val_migration),
328            },
329        }
330    }
331}
332
333impl<K, V> PartMigration<K, V>
334where
335    K: Debug + Codec,
336    V: Debug + Codec,
337{
338    pub(crate) async fn new<T, D>(
339        part: &BatchPart<T>,
340        read: Schemas<K, V>,
341        schema_cache: &mut SchemaCache<K, V, T, D>,
342    ) -> Result<Self, Schemas<K, V>>
343    where
344        T: Timestamp + Lattice + Codec64 + Sync,
345        D: Semigroup + Codec64,
346    {
347        // At one point in time during our structured data migration, we deprecated the
348        // already written schema IDs because we made all columns at the Arrow/Parquet
349        // level nullable, thus changing the schema parts were written with.
350        //
351        // _After_ this deprecation, we've observed at least one instance where a
352        // structured only Part was written with the schema ID in the _old_ deprecated
353        // field. While unexpected, given the ordering of our releases it is safe to
354        // use the deprecated schema ID if we have a structured only part.
355        let write = match (part.schema_id(), part.deprecated_schema_id()) {
356            (Some(write_id), _) => Some(write_id),
357            (None, Some(deprecated_id))
358                if part.is_structured_only(&schema_cache.applier.metrics.columnar) =>
359            {
360                tracing::warn!(?deprecated_id, "falling back to deprecated schema ID");
361                Some(deprecated_id)
362            }
363            (None, _) => None,
364        };
365
366        match (write, read.id) {
367            (None, _) => Ok(PartMigration::Schemaless { read }),
368            (Some(w), Some(r)) if w == r => Ok(PartMigration::SameSchema { both: read }),
369            (Some(w), _) => {
370                let write = schema_cache
371                    .schemas(&w)
372                    .await
373                    .expect("appended part should reference registered schema");
374                // Even if we missing a schema id, if the schemas are equal, use
375                // `SameSchema`. This isn't a correctness issue, we'd just
376                // generate NoOp migrations, but it'll make the metrics more
377                // intuitive.
378                if write.key == read.key && write.val == read.val {
379                    return Ok(PartMigration::SameSchema { both: read });
380                }
381
382                let start = Instant::now();
383                let key_migration = schema_cache
384                    .key_migration(&write, &read)
385                    .ok_or_else(|| read.clone())?;
386                let val_migration = schema_cache
387                    .val_migration(&write, &read)
388                    .ok_or_else(|| read.clone())?;
389                schema_cache
390                    .applier
391                    .metrics
392                    .schema
393                    .migration_new_count
394                    .inc();
395                schema_cache
396                    .applier
397                    .metrics
398                    .schema
399                    .migration_new_seconds
400                    .inc_by(start.elapsed().as_secs_f64());
401
402                Ok(PartMigration::Either {
403                    write,
404                    read,
405                    key_migration,
406                    val_migration,
407                })
408            }
409        }
410    }
411}
412
413impl<K: Codec, V: Codec> PartMigration<K, V> {
414    pub(crate) fn codec_read(&self) -> &Schemas<K, V> {
415        match self {
416            PartMigration::SameSchema { both } => both,
417            PartMigration::Schemaless { read } => read,
418            PartMigration::Either { read, .. } => read,
419        }
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use arrow::array::{
426        Array, ArrayBuilder, StringArray, StringBuilder, StructArray, as_string_array,
427    };
428    use arrow::datatypes::{DataType, Field};
429    use bytes::BufMut;
430    use futures::StreamExt;
431    use mz_dyncfg::ConfigUpdates;
432    use mz_persist_types::ShardId;
433    use mz_persist_types::arrow::ArrayOrd;
434    use mz_persist_types::codec_impls::UnitSchema;
435    use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
436    use mz_persist_types::stats::{NoneStats, StructStats};
437    use timely::progress::Antichain;
438
439    use crate::Diagnostics;
440    use crate::cli::admin::info_log_non_zero_metrics;
441    use crate::read::ReadHandle;
442    use crate::tests::new_test_client;
443
444    use super::*;
445
446    #[mz_ore::test]
447    fn schema_id() {
448        assert_eq!(SchemaId(1).to_string(), "h1");
449        assert_eq!(SchemaId::try_from("h1".to_owned()), Ok(SchemaId(1)));
450        assert!(SchemaId::try_from("nope".to_owned()).is_err());
451    }
452
453    #[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
454    struct Strings(Vec<String>);
455
456    impl Codec for Strings {
457        type Schema = StringsSchema;
458        type Storage = ();
459
460        fn codec_name() -> String {
461            "Strings".into()
462        }
463
464        fn encode<B: BufMut>(&self, buf: &mut B) {
465            buf.put_slice(self.0.join(",").as_bytes());
466        }
467        fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String> {
468            let buf = std::str::from_utf8(buf).map_err(|err| err.to_string())?;
469            let mut ret = buf.split(",").map(|x| x.to_owned()).collect::<Vec<_>>();
470            // Fill in nulls or drop columns to match the requested schema.
471            while schema.0.len() > ret.len() {
472                ret.push("".into());
473            }
474            while schema.0.len() < ret.len() {
475                ret.pop();
476            }
477            Ok(Strings(ret))
478        }
479
480        fn encode_schema(schema: &Self::Schema) -> bytes::Bytes {
481            schema
482                .0
483                .iter()
484                .map(|x| x.then_some('n').unwrap_or(' '))
485                .collect::<String>()
486                .into_bytes()
487                .into()
488        }
489        fn decode_schema(buf: &bytes::Bytes) -> Self::Schema {
490            let buf = std::str::from_utf8(buf).expect("valid schema");
491            StringsSchema(
492                buf.chars()
493                    .map(|x| match x {
494                        'n' => true,
495                        ' ' => false,
496                        _ => unreachable!(),
497                    })
498                    .collect(),
499            )
500        }
501    }
502
503    #[derive(Debug, Clone, Default, PartialEq)]
504    struct StringsSchema(Vec<bool>);
505
506    impl Schema<Strings> for StringsSchema {
507        type ArrowColumn = StructArray;
508        type Statistics = NoneStats;
509        type Decoder = StringsDecoder;
510        type Encoder = StringsEncoder;
511
512        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
513            let mut cols = Vec::new();
514            for (idx, _) in self.0.iter().enumerate() {
515                cols.push(as_string_array(col.column_by_name(&idx.to_string()).unwrap()).clone());
516            }
517            Ok(StringsDecoder(cols))
518        }
519        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
520            let mut fields = Vec::new();
521            let mut arrays = Vec::new();
522            for (idx, nullable) in self.0.iter().enumerate() {
523                fields.push(Field::new(idx.to_string(), DataType::Utf8, *nullable));
524                arrays.push(StringBuilder::new());
525            }
526            Ok(StringsEncoder { fields, arrays })
527        }
528    }
529
530    #[derive(Debug)]
531    struct StringsDecoder(Vec<StringArray>);
532    impl ColumnDecoder<Strings> for StringsDecoder {
533        fn decode(&self, idx: usize, val: &mut Strings) {
534            val.0.clear();
535            for col in self.0.iter() {
536                if col.is_valid(idx) {
537                    val.0.push(col.value(idx).into());
538                } else {
539                    val.0.push("".into());
540                }
541            }
542        }
543        fn is_null(&self, _: usize) -> bool {
544            false
545        }
546        fn goodbytes(&self) -> usize {
547            self.0
548                .iter()
549                .map(|val| ArrayOrd::String(val.clone()).goodbytes())
550                .sum()
551        }
552        fn stats(&self) -> StructStats {
553            StructStats {
554                len: self.0[0].len(),
555                cols: Default::default(),
556            }
557        }
558    }
559
560    #[derive(Debug)]
561    struct StringsEncoder {
562        fields: Vec<Field>,
563        arrays: Vec<StringBuilder>,
564    }
565    impl ColumnEncoder<Strings> for StringsEncoder {
566        type FinishedColumn = StructArray;
567
568        fn goodbytes(&self) -> usize {
569            self.arrays.iter().map(|a| a.values_slice().len()).sum()
570        }
571
572        fn append(&mut self, val: &Strings) {
573            for (idx, val) in val.0.iter().enumerate() {
574                if val.is_empty() {
575                    self.arrays[idx].append_null();
576                } else {
577                    self.arrays[idx].append_value(val);
578                }
579            }
580        }
581        fn append_null(&mut self) {
582            unreachable!()
583        }
584        fn finish(self) -> Self::FinishedColumn {
585            assert_eq!(self.fields.len(), self.arrays.len(), "invalid schema");
586            if self.fields.is_empty() {
587                StructArray::new_empty_fields(0, None)
588            } else {
589                let arrays = self
590                    .arrays
591                    .into_iter()
592                    .map(|mut x| ArrayBuilder::finish(&mut x))
593                    .collect();
594                StructArray::new(self.fields.into(), arrays, None)
595            }
596        }
597    }
598
599    #[mz_persist_proc::test(tokio::test)]
600    #[cfg_attr(miri, ignore)]
601    async fn compare_and_evolve_schema(dyncfgs: ConfigUpdates) {
602        let client = new_test_client(&dyncfgs).await;
603        let d = Diagnostics::for_tests();
604        let shard_id = ShardId::new();
605        let schema0 = StringsSchema(vec![false]);
606        let schema1 = StringsSchema(vec![false, true]);
607
608        let write0 = client
609            .open_writer::<Strings, (), u64, i64>(
610                shard_id,
611                Arc::new(schema0.clone()),
612                Arc::new(UnitSchema),
613                d.clone(),
614            )
615            .await
616            .unwrap();
617        assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));
618
619        // Not backward compatible (yet... we don't support dropping a column at
620        // the moment).
621        let res = client
622            .compare_and_evolve_schema::<Strings, (), u64, i64>(
623                shard_id,
624                SchemaId(0),
625                &StringsSchema(vec![]),
626                &UnitSchema,
627                d.clone(),
628            )
629            .await
630            .unwrap();
631        assert_eq!(res, CaESchema::Incompatible);
632
633        // Incorrect expectation
634        let res = client
635            .compare_and_evolve_schema::<Strings, (), u64, i64>(
636                shard_id,
637                SchemaId(1),
638                &schema1,
639                &UnitSchema,
640                d.clone(),
641            )
642            .await
643            .unwrap();
644        assert_eq!(
645            res,
646            CaESchema::ExpectedMismatch {
647                schema_id: SchemaId(0),
648                key: schema0,
649                val: UnitSchema
650            }
651        );
652
653        // Successful evolution
654        let res = client
655            .compare_and_evolve_schema::<Strings, (), u64, i64>(
656                shard_id,
657                SchemaId(0),
658                &schema1,
659                &UnitSchema,
660                d.clone(),
661            )
662            .await
663            .unwrap();
664        assert_eq!(res, CaESchema::Ok(SchemaId(1)));
665
666        // Create a write handle with the new schema and validate that it picks
667        // up the correct schema id.
668        let write1 = client
669            .open_writer::<Strings, (), u64, i64>(
670                shard_id,
671                Arc::new(schema1),
672                Arc::new(UnitSchema),
673                d.clone(),
674            )
675            .await
676            .unwrap();
677        assert_eq!(write1.write_schemas.id.unwrap(), SchemaId(1));
678    }
679
680    fn strings(xs: &[((Result<Strings, String>, Result<(), String>), u64, i64)]) -> Vec<Vec<&str>> {
681        xs.iter()
682            .map(|((k, _), _, _)| k.as_ref().unwrap().0.iter().map(|x| x.as_str()).collect())
683            .collect()
684    }
685
686    #[mz_persist_proc::test(tokio::test)]
687    #[cfg_attr(miri, ignore)]
688    async fn schema_evolution(dyncfgs: ConfigUpdates) {
689        async fn snap_streaming(
690            as_of: u64,
691            read: &mut ReadHandle<Strings, (), u64, i64>,
692        ) -> Vec<((Result<Strings, String>, Result<(), String>), u64, i64)> {
693            // NB: We test with both snapshot_and_fetch and snapshot_and_stream
694            // because one uses the consolidating iter and one doesn't.
695            let mut ret = read
696                .snapshot_and_stream(Antichain::from_elem(as_of))
697                .await
698                .unwrap()
699                .collect::<Vec<_>>()
700                .await;
701            ret.sort();
702            ret
703        }
704
705        let client = new_test_client(&dyncfgs).await;
706        let d = Diagnostics::for_tests();
707        let shard_id = ShardId::new();
708        let schema0 = StringsSchema(vec![false]);
709        let schema1 = StringsSchema(vec![false, true]);
710
711        // Write some data at the original schema.
712        let (mut write0, mut read0) = client
713            .open::<Strings, (), u64, i64>(
714                shard_id,
715                Arc::new(schema0.clone()),
716                Arc::new(UnitSchema),
717                d.clone(),
718                true,
719            )
720            .await
721            .unwrap();
722        write0
723            .expect_compare_and_append(&[((Strings(vec!["0 before".into()]), ()), 0, 1)], 0, 1)
724            .await;
725        let expected = vec![vec!["0 before"]];
726        assert_eq!(strings(&snap_streaming(0, &mut read0).await), expected);
727        assert_eq!(strings(&read0.expect_snapshot_and_fetch(0).await), expected);
728
729        // Register and write some data at the new schema.
730        let res = client
731            .compare_and_evolve_schema::<Strings, (), u64, i64>(
732                shard_id,
733                SchemaId(0),
734                &schema1,
735                &UnitSchema,
736                d.clone(),
737            )
738            .await
739            .unwrap();
740        assert_eq!(res, CaESchema::Ok(SchemaId(1)));
741        let (mut write1, mut read1) = client
742            .open::<Strings, (), u64, i64>(
743                shard_id,
744                Arc::new(schema1.clone()),
745                Arc::new(UnitSchema),
746                d.clone(),
747                true,
748            )
749            .await
750            .unwrap();
751        write1
752            .expect_compare_and_append(
753                &[
754                    ((Strings(vec!["1 null".into(), "".into()]), ()), 1, 1),
755                    ((Strings(vec!["1 not".into(), "x".into()]), ()), 1, 1),
756                ],
757                1,
758                2,
759            )
760            .await;
761
762        // Continue to write data with the original schema.
763        write0
764            .expect_compare_and_append(&[((Strings(vec!["0 after".into()]), ()), 2, 1)], 2, 3)
765            .await;
766
767        // Original schema drops the new column in data written by new schema.
768        let expected = vec![
769            vec!["0 after"],
770            vec!["0 before"],
771            vec!["1 not"],
772            vec!["1 null"],
773        ];
774        assert_eq!(strings(&snap_streaming(2, &mut read0).await), expected);
775        assert_eq!(strings(&read0.expect_snapshot_and_fetch(2).await), expected);
776
777        // New schema adds nulls (represented by empty string in Strings) in
778        // data written by old schema.
779        let expected = vec![
780            vec!["0 after", ""],
781            vec!["0 before", ""],
782            vec!["1 not", "x"],
783            vec!["1 null", ""],
784        ];
785        assert_eq!(strings(&snap_streaming(2, &mut read1).await), expected);
786        assert_eq!(strings(&read1.expect_snapshot_and_fetch(2).await), expected);
787
788        // Probably too spammy to leave in the logs, but it was useful to have
789        // hooked up while iterating.
790        if false {
791            info_log_non_zero_metrics(&client.metrics.registry.gather());
792        }
793    }
794}