Skip to main content

mz_persist_client/
schema.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Persist shard schema information.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::{Arc, RwLock};
15use std::time::Instant;
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use mz_ore::cast::CastFrom;
20use mz_persist_types::columnar::data_type;
21use mz_persist_types::schema::{Migration, SchemaId, backward_compatible};
22use mz_persist_types::{Codec, Codec64};
23use timely::progress::Timestamp;
24
25use crate::internal::apply::Applier;
26use crate::internal::encoding::Schemas;
27use crate::internal::metrics::{SchemaCacheMetrics, SchemaMetrics};
28use crate::internal::state::{BatchPart, EncodedSchemas};
29
30/// The result returned by [crate::PersistClient::compare_and_evolve_schema].
31#[derive(Debug)]
32#[cfg_attr(test, derive(PartialEq))]
33pub enum CaESchema<K: Codec, V: Codec> {
34    /// The schema was successfully evolved and registered with the included id.
35    Ok(SchemaId),
36    /// The schema was not compatible with previously registered schemas.
37    Incompatible,
38    /// The `expected` SchemaId did not match reality. The current one is
39    /// included for easy of retry.
40    ExpectedMismatch {
41        /// The current schema id.
42        schema_id: SchemaId,
43        /// The key schema at this id.
44        key: K::Schema,
45        /// The val schema at this id.
46        val: V::Schema,
47    },
48}
49
50/// A cache of decoded schemas and schema migrations.
51///
52/// The decoded schemas are a cache of the registry in state, and so are shared
53/// process-wide.
54///
55/// On the other hand, the migrations have an N^2 problem and so are per-handle.
56/// This also seems reasonable because for any given write handle, the write
57/// schema will be the same for all migration entries, and ditto for read handle
58/// and read schema.
59#[derive(Debug)]
60pub(crate) struct SchemaCache<K: Codec, V: Codec, T, D> {
61    maps: Arc<SchemaCacheMaps<K, V>>,
62    applier: Applier<K, V, T, D>,
63    key_migration_by_ids: MigrationCacheMap,
64    val_migration_by_ids: MigrationCacheMap,
65}
66
67impl<K: Codec, V: Codec, T: Clone, D> Clone for SchemaCache<K, V, T, D> {
68    fn clone(&self) -> Self {
69        Self {
70            maps: Arc::clone(&self.maps),
71            applier: self.applier.clone(),
72            key_migration_by_ids: self.key_migration_by_ids.clone(),
73            val_migration_by_ids: self.val_migration_by_ids.clone(),
74        }
75    }
76}
77
78impl<K: Codec, V: Codec, T, D> Drop for SchemaCache<K, V, T, D> {
79    fn drop(&mut self) {
80        let dropped = u64::cast_from(
81            self.key_migration_by_ids.by_ids.len() + self.val_migration_by_ids.by_ids.len(),
82        );
83        self.applier
84            .metrics
85            .schema
86            .cache_migration
87            .dropped_count
88            .inc_by(dropped);
89    }
90}
91
92impl<K, V, T, D> SchemaCache<K, V, T, D>
93where
94    K: Debug + Codec,
95    V: Debug + Codec,
96    T: Timestamp + Lattice + Codec64 + Sync,
97    D: Monoid + Codec64,
98{
99    /// Returns the [Applier] backing this cache.
100    pub(crate) fn applier(&self) -> &Applier<K, V, T, D> {
101        &self.applier
102    }
103
104    pub fn new(maps: Arc<SchemaCacheMaps<K, V>>, applier: Applier<K, V, T, D>) -> Self {
105        let key_migration_by_ids = MigrationCacheMap {
106            metrics: applier.metrics.schema.cache_migration.clone(),
107            by_ids: BTreeMap::new(),
108        };
109        let val_migration_by_ids = MigrationCacheMap {
110            metrics: applier.metrics.schema.cache_migration.clone(),
111            by_ids: BTreeMap::new(),
112        };
113        SchemaCache {
114            maps,
115            applier,
116            key_migration_by_ids,
117            val_migration_by_ids,
118        }
119    }
120
121    async fn schemas(&self, id: &SchemaId) -> Option<Schemas<K, V>> {
122        let key = self
123            .get_or_try_init(&self.maps.key_by_id, id, |schemas| {
124                self.maps.key_by_id.metrics.computed_count.inc();
125                schemas.get(id).map(|x| K::decode_schema(&x.key))
126            })
127            .await?;
128        let val = self
129            .get_or_try_init(&self.maps.val_by_id, id, |schemas| {
130                self.maps.val_by_id.metrics.computed_count.inc();
131                schemas.get(id).map(|x| V::decode_schema(&x.val))
132            })
133            .await?;
134        Some(Schemas {
135            id: Some(*id),
136            key,
137            val,
138        })
139    }
140
141    fn key_migration(
142        &mut self,
143        write: &Schemas<K, V>,
144        read: &Schemas<K, V>,
145    ) -> Option<Arc<Migration>> {
146        let migration_fn = || Self::migration::<K>(&write.key, &read.key);
147        let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
148            // TODO: Annoying to cache this because we're missing an id. This
149            // will probably require some sort of refactor to fix so punting for
150            // now.
151            self.key_migration_by_ids.metrics.computed_count.inc();
152            return migration_fn().map(Arc::new);
153        };
154        self.key_migration_by_ids
155            .get_or_try_insert(write_id, read_id, migration_fn)
156    }
157
158    fn val_migration(
159        &mut self,
160        write: &Schemas<K, V>,
161        read: &Schemas<K, V>,
162    ) -> Option<Arc<Migration>> {
163        let migration_fn = || Self::migration::<V>(&write.val, &read.val);
164        let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
165            // TODO: Annoying to cache this because we're missing an id. This
166            // will probably require some sort of refactor to fix so punting for
167            // now.
168            self.val_migration_by_ids.metrics.computed_count.inc();
169            return migration_fn().map(Arc::new);
170        };
171        self.val_migration_by_ids
172            .get_or_try_insert(write_id, read_id, migration_fn)
173    }
174
175    fn migration<C: Codec>(write: &C::Schema, read: &C::Schema) -> Option<Migration> {
176        let write_dt = data_type::<C>(write).expect("valid schema");
177        let read_dt = data_type::<C>(read).expect("valid schema");
178        backward_compatible(&write_dt, &read_dt)
179    }
180
181    async fn get_or_try_init<MK: Clone + Ord, MV: PartialEq + Debug>(
182        &self,
183        map: &SchemaCacheMap<MK, MV>,
184        key: &MK,
185        f: impl Fn(&BTreeMap<SchemaId, EncodedSchemas>) -> Option<MV>,
186    ) -> Option<Arc<MV>> {
187        let ret = map.get_or_try_init(key, || {
188            self.applier
189                .schemas(|seqno, schemas| f(schemas).ok_or(seqno))
190        });
191        let seqno = match ret {
192            Ok(ret) => return Some(ret),
193            Err(seqno) => seqno,
194        };
195        self.applier.metrics.schema.cache_fetch_state_count.inc();
196        self.applier.fetch_and_update_state(Some(seqno)).await;
197        map.get_or_try_init(key, || {
198            self.applier
199                .schemas(|seqno, schemas| f(schemas).ok_or(seqno))
200        })
201        .ok()
202    }
203}
204
205#[derive(Debug)]
206pub(crate) struct SchemaCacheMaps<K: Codec, V: Codec> {
207    key_by_id: SchemaCacheMap<SchemaId, K::Schema>,
208    val_by_id: SchemaCacheMap<SchemaId, V::Schema>,
209}
210
211impl<K: Codec, V: Codec> SchemaCacheMaps<K, V> {
212    pub(crate) fn new(metrics: &SchemaMetrics) -> Self {
213        Self {
214            key_by_id: SchemaCacheMap {
215                metrics: metrics.cache_schema.clone(),
216                map: RwLock::new(BTreeMap::new()),
217            },
218            val_by_id: SchemaCacheMap {
219                metrics: metrics.cache_schema.clone(),
220                map: RwLock::new(BTreeMap::new()),
221            },
222        }
223    }
224}
225
226#[derive(Debug)]
227struct SchemaCacheMap<I, S> {
228    metrics: SchemaCacheMetrics,
229    map: RwLock<BTreeMap<I, Arc<S>>>,
230}
231
232impl<I: Clone + Ord, S: PartialEq + Debug> SchemaCacheMap<I, S> {
233    fn get_or_try_init<E>(
234        &self,
235        id: &I,
236        state_fn: impl FnOnce() -> Result<S, E>,
237    ) -> Result<Arc<S>, E> {
238        // First see if we have the value cached.
239        {
240            let map = self.map.read().expect("lock");
241            if let Some(ret) = map.get(id).map(Arc::clone) {
242                self.metrics.cached_count.inc();
243                return Ok(ret);
244            }
245        }
246        // If not, see if we can get the value from current state.
247        let ret = state_fn().map(Arc::new);
248        if let Ok(val) = ret.as_ref() {
249            let mut map = self.map.write().expect("lock");
250            // If any answers got written in the meantime, they should be the
251            // same, so just overwrite
252            let prev = map.insert(id.clone(), Arc::clone(val));
253            match prev {
254                Some(prev) => debug_assert_eq!(*val, prev),
255                None => self.metrics.added_count.inc(),
256            }
257        } else {
258            self.metrics.unavailable_count.inc();
259        }
260        ret
261    }
262}
263
264impl<I, K> Drop for SchemaCacheMap<I, K> {
265    fn drop(&mut self) {
266        let map = self.map.read().expect("lock");
267        self.metrics.dropped_count.inc_by(u64::cast_from(map.len()));
268    }
269}
270
271#[derive(Debug, Clone)]
272struct MigrationCacheMap {
273    metrics: SchemaCacheMetrics,
274    by_ids: BTreeMap<(SchemaId, SchemaId), Arc<Migration>>,
275}
276
277impl MigrationCacheMap {
278    fn get_or_try_insert(
279        &mut self,
280        write_id: SchemaId,
281        read_id: SchemaId,
282        migration_fn: impl FnOnce() -> Option<Migration>,
283    ) -> Option<Arc<Migration>> {
284        if let Some(migration) = self.by_ids.get(&(write_id, read_id)) {
285            self.metrics.cached_count.inc();
286            return Some(Arc::clone(migration));
287        };
288        self.metrics.computed_count.inc();
289        let migration = migration_fn().map(Arc::new);
290        if let Some(migration) = migration.as_ref() {
291            self.metrics.added_count.inc();
292            // We just looked this up above and we've got mutable access, so no
293            // race issues.
294            self.by_ids
295                .insert((write_id, read_id), Arc::clone(migration));
296        } else {
297            self.metrics.unavailable_count.inc();
298        }
299        migration
300    }
301}
302
303#[derive(Debug)]
304pub(crate) enum PartMigration<K: Codec, V: Codec> {
305    /// No-op!
306    SameSchema { both: Schemas<K, V> },
307    /// We don't have a schema id for write schema.
308    Schemaless { read: Schemas<K, V> },
309    /// We have both write and read schemas, and they don't match.
310    Either {
311        write: Schemas<K, V>,
312        read: Schemas<K, V>,
313        key_migration: Arc<Migration>,
314        val_migration: Arc<Migration>,
315    },
316}
317
318impl<K: Codec, V: Codec> Clone for PartMigration<K, V> {
319    fn clone(&self) -> Self {
320        match self {
321            Self::SameSchema { both } => Self::SameSchema { both: both.clone() },
322            Self::Schemaless { read } => Self::Schemaless { read: read.clone() },
323            Self::Either {
324                write,
325                read,
326                key_migration,
327                val_migration,
328            } => Self::Either {
329                write: write.clone(),
330                read: read.clone(),
331                key_migration: Arc::clone(key_migration),
332                val_migration: Arc::clone(val_migration),
333            },
334        }
335    }
336}
337
338impl<K, V> PartMigration<K, V>
339where
340    K: Debug + Codec,
341    V: Debug + Codec,
342{
343    pub(crate) async fn new<T, D>(
344        part: &BatchPart<T>,
345        read: Schemas<K, V>,
346        schema_cache: &mut SchemaCache<K, V, T, D>,
347    ) -> Result<Self, Schemas<K, V>>
348    where
349        T: Timestamp + Lattice + Codec64 + Sync,
350        D: Monoid + Codec64,
351    {
352        // At one point in time during our structured data migration, we deprecated the
353        // already written schema IDs because we made all columns at the Arrow/Parquet
354        // level nullable, thus changing the schema parts were written with.
355        //
356        // _After_ this deprecation, we've observed at least one instance where a
357        // structured only Part was written with the schema ID in the _old_ deprecated
358        // field. While unexpected, given the ordering of our releases it is safe to
359        // use the deprecated schema ID if we have a structured only part.
360        let write = match (part.schema_id(), part.deprecated_schema_id()) {
361            (Some(write_id), _) => Some(write_id),
362            (None, Some(deprecated_id))
363                if part.is_structured_only(&schema_cache.applier.metrics.columnar) =>
364            {
365                tracing::warn!(?deprecated_id, "falling back to deprecated schema ID");
366                Some(deprecated_id)
367            }
368            (None, _) => None,
369        };
370
371        match (write, read.id) {
372            (None, _) => Ok(PartMigration::Schemaless { read }),
373            (Some(w), Some(r)) if w == r => Ok(PartMigration::SameSchema { both: read }),
374            (Some(w), _) => {
375                let write = schema_cache
376                    .schemas(&w)
377                    .await
378                    .expect("appended part should reference registered schema");
379                // Even if we missing a schema id, if the schemas are equal, use
380                // `SameSchema`. This isn't a correctness issue, we'd just
381                // generate NoOp migrations, but it'll make the metrics more
382                // intuitive.
383                if write.key == read.key && write.val == read.val {
384                    return Ok(PartMigration::SameSchema { both: read });
385                }
386
387                let start = Instant::now();
388                let key_migration = schema_cache
389                    .key_migration(&write, &read)
390                    .ok_or_else(|| read.clone())?;
391                let val_migration = schema_cache
392                    .val_migration(&write, &read)
393                    .ok_or_else(|| read.clone())?;
394                schema_cache
395                    .applier
396                    .metrics
397                    .schema
398                    .migration_new_count
399                    .inc();
400                schema_cache
401                    .applier
402                    .metrics
403                    .schema
404                    .migration_new_seconds
405                    .inc_by(start.elapsed().as_secs_f64());
406
407                Ok(PartMigration::Either {
408                    write,
409                    read,
410                    key_migration,
411                    val_migration,
412                })
413            }
414        }
415    }
416}
417
418impl<K: Codec, V: Codec> PartMigration<K, V> {
419    pub(crate) fn codec_read(&self) -> &Schemas<K, V> {
420        match self {
421            PartMigration::SameSchema { both } => both,
422            PartMigration::Schemaless { read } => read,
423            PartMigration::Either { read, .. } => read,
424        }
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use arrow::array::{
431        Array, ArrayBuilder, StringArray, StringBuilder, StructArray, as_string_array,
432    };
433    use arrow::datatypes::{DataType, Field};
434    use bytes::BufMut;
435    use futures::StreamExt;
436    use mz_dyncfg::ConfigUpdates;
437    use mz_persist_types::ShardId;
438    use mz_persist_types::arrow::ArrayOrd;
439    use mz_persist_types::codec_impls::UnitSchema;
440    use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
441    use mz_persist_types::stats::{NoneStats, StructStats};
442    use timely::progress::Antichain;
443
444    use crate::Diagnostics;
445    use crate::cli::admin::info_log_non_zero_metrics;
446    use crate::read::ReadHandle;
447    use crate::tests::new_test_client;
448
449    use super::*;
450
451    #[mz_ore::test]
452    fn schema_id() {
453        assert_eq!(SchemaId(1).to_string(), "h1");
454        assert_eq!(SchemaId::try_from("h1".to_owned()), Ok(SchemaId(1)));
455        assert!(SchemaId::try_from("nope".to_owned()).is_err());
456    }
457
458    #[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
459    struct Strings(Vec<String>);
460
461    impl Codec for Strings {
462        type Schema = StringsSchema;
463        type Storage = ();
464
465        fn codec_name() -> String {
466            "Strings".into()
467        }
468
469        fn encode<B: BufMut>(&self, buf: &mut B) {
470            buf.put_slice(self.0.join(",").as_bytes());
471        }
472        fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String> {
473            let buf = std::str::from_utf8(buf).map_err(|err| err.to_string())?;
474            let mut ret = buf.split(",").map(|x| x.to_owned()).collect::<Vec<_>>();
475            // Fill in nulls or drop columns to match the requested schema.
476            while schema.0.len() > ret.len() {
477                ret.push("".into());
478            }
479            while schema.0.len() < ret.len() {
480                ret.pop();
481            }
482            Ok(Strings(ret))
483        }
484
485        fn encode_schema(schema: &Self::Schema) -> bytes::Bytes {
486            schema
487                .0
488                .iter()
489                .map(|x| x.then_some('n').unwrap_or(' '))
490                .collect::<String>()
491                .into_bytes()
492                .into()
493        }
494        fn decode_schema(buf: &bytes::Bytes) -> Self::Schema {
495            let buf = std::str::from_utf8(buf).expect("valid schema");
496            StringsSchema(
497                buf.chars()
498                    .map(|x| match x {
499                        'n' => true,
500                        ' ' => false,
501                        _ => unreachable!(),
502                    })
503                    .collect(),
504            )
505        }
506    }
507
508    #[derive(Debug, Clone, Default, PartialEq)]
509    struct StringsSchema(Vec<bool>);
510
511    impl Schema<Strings> for StringsSchema {
512        type ArrowColumn = StructArray;
513        type Statistics = NoneStats;
514        type Decoder = StringsDecoder;
515        type Encoder = StringsEncoder;
516
517        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
518            let mut cols = Vec::new();
519            for (idx, _) in self.0.iter().enumerate() {
520                cols.push(as_string_array(col.column_by_name(&idx.to_string()).unwrap()).clone());
521            }
522            Ok(StringsDecoder(cols))
523        }
524        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
525            let mut fields = Vec::new();
526            let mut arrays = Vec::new();
527            for (idx, nullable) in self.0.iter().enumerate() {
528                fields.push(Field::new(idx.to_string(), DataType::Utf8, *nullable));
529                arrays.push(StringBuilder::new());
530            }
531            Ok(StringsEncoder { fields, arrays })
532        }
533    }
534
535    #[derive(Debug)]
536    struct StringsDecoder(Vec<StringArray>);
537    impl ColumnDecoder<Strings> for StringsDecoder {
538        fn decode(&self, idx: usize, val: &mut Strings) {
539            val.0.clear();
540            for col in self.0.iter() {
541                if col.is_valid(idx) {
542                    val.0.push(col.value(idx).into());
543                } else {
544                    val.0.push("".into());
545                }
546            }
547        }
548        fn is_null(&self, _: usize) -> bool {
549            false
550        }
551        fn goodbytes(&self) -> usize {
552            self.0
553                .iter()
554                .map(|val| ArrayOrd::String(val.clone()).goodbytes())
555                .sum()
556        }
557        fn stats(&self) -> StructStats {
558            StructStats {
559                len: self.0[0].len(),
560                cols: Default::default(),
561            }
562        }
563    }
564
565    #[derive(Debug)]
566    struct StringsEncoder {
567        fields: Vec<Field>,
568        arrays: Vec<StringBuilder>,
569    }
570    impl ColumnEncoder<Strings> for StringsEncoder {
571        type FinishedColumn = StructArray;
572
573        fn goodbytes(&self) -> usize {
574            self.arrays.iter().map(|a| a.values_slice().len()).sum()
575        }
576
577        fn append(&mut self, val: &Strings) {
578            for (idx, val) in val.0.iter().enumerate() {
579                if val.is_empty() {
580                    self.arrays[idx].append_null();
581                } else {
582                    self.arrays[idx].append_value(val);
583                }
584            }
585        }
586        fn append_null(&mut self) {
587            unreachable!()
588        }
589        fn finish(self) -> Self::FinishedColumn {
590            assert_eq!(self.fields.len(), self.arrays.len(), "invalid schema");
591            if self.fields.is_empty() {
592                StructArray::new_empty_fields(0, None)
593            } else {
594                let arrays = self
595                    .arrays
596                    .into_iter()
597                    .map(|mut x| ArrayBuilder::finish(&mut x))
598                    .collect();
599                StructArray::new(self.fields.into(), arrays, None)
600            }
601        }
602    }
603
604    #[mz_persist_proc::test(tokio::test)]
605    #[cfg_attr(miri, ignore)]
606    async fn compare_and_evolve_schema(dyncfgs: ConfigUpdates) {
607        let client = new_test_client(&dyncfgs).await;
608        let d = Diagnostics::for_tests();
609        let shard_id = ShardId::new();
610        let schema0 = StringsSchema(vec![false]);
611        let schema1 = StringsSchema(vec![false, true]);
612
613        let mut write0 = client
614            .open_writer::<Strings, (), u64, i64>(
615                shard_id,
616                Arc::new(schema0.clone()),
617                Arc::new(UnitSchema),
618                d.clone(),
619            )
620            .await
621            .unwrap();
622
623        write0.try_register_schema().await;
624        assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));
625
626        // Not backward compatible (yet... we don't support dropping a column at
627        // the moment).
628        let res = client
629            .compare_and_evolve_schema::<Strings, (), u64, i64>(
630                shard_id,
631                SchemaId(0),
632                &StringsSchema(vec![]),
633                &UnitSchema,
634                d.clone(),
635            )
636            .await
637            .unwrap();
638        assert_eq!(res, CaESchema::Incompatible);
639
640        // Incorrect expectation
641        let res = client
642            .compare_and_evolve_schema::<Strings, (), u64, i64>(
643                shard_id,
644                SchemaId(1),
645                &schema1,
646                &UnitSchema,
647                d.clone(),
648            )
649            .await
650            .unwrap();
651        assert_eq!(
652            res,
653            CaESchema::ExpectedMismatch {
654                schema_id: SchemaId(0),
655                key: schema0,
656                val: UnitSchema
657            }
658        );
659
660        // Successful evolution
661        let res = client
662            .compare_and_evolve_schema::<Strings, (), u64, i64>(
663                shard_id,
664                SchemaId(0),
665                &schema1,
666                &UnitSchema,
667                d.clone(),
668            )
669            .await
670            .unwrap();
671        assert_eq!(res, CaESchema::Ok(SchemaId(1)));
672
673        // Create a write handle with the new schema and validate that it picks
674        // up the correct schema id.
675        let write1 = client
676            .open_writer::<Strings, (), u64, i64>(
677                shard_id,
678                Arc::new(schema1),
679                Arc::new(UnitSchema),
680                d.clone(),
681            )
682            .await
683            .unwrap();
684        assert_eq!(write1.write_schemas.id.unwrap(), SchemaId(1));
685    }
686
687    fn strings(xs: &[((Strings, ()), u64, i64)]) -> Vec<Vec<&str>> {
688        xs.iter()
689            .map(|((k, _), _, _)| k.0.iter().map(|x| x.as_str()).collect())
690            .collect()
691    }
692
693    #[mz_persist_proc::test(tokio::test)]
694    #[cfg_attr(miri, ignore)]
695    async fn schema_evolution(dyncfgs: ConfigUpdates) {
696        async fn snap_streaming(
697            as_of: u64,
698            read: &mut ReadHandle<Strings, (), u64, i64>,
699        ) -> Vec<((Strings, ()), u64, i64)> {
700            // NB: We test with both snapshot_and_fetch and snapshot_and_stream
701            // because one uses the consolidating iter and one doesn't.
702            let mut ret = read
703                .snapshot_and_stream(Antichain::from_elem(as_of))
704                .await
705                .unwrap()
706                .collect::<Vec<_>>()
707                .await;
708            ret.sort();
709            ret
710        }
711
712        let client = new_test_client(&dyncfgs).await;
713        let d = Diagnostics::for_tests();
714        let shard_id = ShardId::new();
715        let schema0 = StringsSchema(vec![false]);
716        let schema1 = StringsSchema(vec![false, true]);
717
718        // Write some data at the original schema.
719        let (mut write0, mut read0) = client
720            .open::<Strings, (), u64, i64>(
721                shard_id,
722                Arc::new(schema0.clone()),
723                Arc::new(UnitSchema),
724                d.clone(),
725                true,
726            )
727            .await
728            .unwrap();
729        write0
730            .expect_compare_and_append(&[((Strings(vec!["0 before".into()]), ()), 0, 1)], 0, 1)
731            .await;
732        let expected = vec![vec!["0 before"]];
733        assert_eq!(strings(&snap_streaming(0, &mut read0).await), expected);
734        assert_eq!(strings(&read0.expect_snapshot_and_fetch(0).await), expected);
735
736        // Register and write some data at the new schema.
737        let res = client
738            .compare_and_evolve_schema::<Strings, (), u64, i64>(
739                shard_id,
740                SchemaId(0),
741                &schema1,
742                &UnitSchema,
743                d.clone(),
744            )
745            .await
746            .unwrap();
747        assert_eq!(res, CaESchema::Ok(SchemaId(1)));
748        let (mut write1, mut read1) = client
749            .open::<Strings, (), u64, i64>(
750                shard_id,
751                Arc::new(schema1.clone()),
752                Arc::new(UnitSchema),
753                d.clone(),
754                true,
755            )
756            .await
757            .unwrap();
758        write1
759            .expect_compare_and_append(
760                &[
761                    ((Strings(vec!["1 null".into(), "".into()]), ()), 1, 1),
762                    ((Strings(vec!["1 not".into(), "x".into()]), ()), 1, 1),
763                ],
764                1,
765                2,
766            )
767            .await;
768
769        // Continue to write data with the original schema.
770        write0
771            .expect_compare_and_append(&[((Strings(vec!["0 after".into()]), ()), 2, 1)], 2, 3)
772            .await;
773
774        // Original schema drops the new column in data written by new schema.
775        let expected = vec![
776            vec!["0 after"],
777            vec!["0 before"],
778            vec!["1 not"],
779            vec!["1 null"],
780        ];
781        assert_eq!(strings(&snap_streaming(2, &mut read0).await), expected);
782        assert_eq!(strings(&read0.expect_snapshot_and_fetch(2).await), expected);
783
784        // New schema adds nulls (represented by empty string in Strings) in
785        // data written by old schema.
786        let expected = vec![
787            vec!["0 after", ""],
788            vec!["0 before", ""],
789            vec!["1 not", "x"],
790            vec!["1 null", ""],
791        ];
792        assert_eq!(strings(&snap_streaming(2, &mut read1).await), expected);
793        assert_eq!(strings(&read1.expect_snapshot_and_fetch(2).await), expected);
794
795        // Probably too spammy to leave in the logs, but it was useful to have
796        // hooked up while iterating.
797        if false {
798            info_log_non_zero_metrics(&client.metrics.registry.gather());
799        }
800    }
801}