1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::{Arc, RwLock};
15use std::time::Instant;
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use mz_ore::cast::CastFrom;
20use mz_persist_types::columnar::data_type;
21use mz_persist_types::schema::{Migration, SchemaId, backward_compatible};
22use mz_persist_types::{Codec, Codec64};
23use timely::progress::Timestamp;
24
25use crate::internal::apply::Applier;
26use crate::internal::encoding::Schemas;
27use crate::internal::metrics::{SchemaCacheMetrics, SchemaMetrics};
28use crate::internal::state::{BatchPart, EncodedSchemas};
29
30#[derive(Debug)]
32#[cfg_attr(test, derive(PartialEq))]
33pub enum CaESchema<K: Codec, V: Codec> {
34 Ok(SchemaId),
36 Incompatible,
38 ExpectedMismatch {
41 schema_id: SchemaId,
43 key: K::Schema,
45 val: V::Schema,
47 },
48}
49
50#[derive(Debug)]
60pub(crate) struct SchemaCache<K: Codec, V: Codec, T, D> {
61 maps: Arc<SchemaCacheMaps<K, V>>,
62 applier: Applier<K, V, T, D>,
63 key_migration_by_ids: MigrationCacheMap,
64 val_migration_by_ids: MigrationCacheMap,
65}
66
67impl<K: Codec, V: Codec, T: Clone, D> Clone for SchemaCache<K, V, T, D> {
68 fn clone(&self) -> Self {
69 Self {
70 maps: Arc::clone(&self.maps),
71 applier: self.applier.clone(),
72 key_migration_by_ids: self.key_migration_by_ids.clone(),
73 val_migration_by_ids: self.val_migration_by_ids.clone(),
74 }
75 }
76}
77
78impl<K: Codec, V: Codec, T, D> Drop for SchemaCache<K, V, T, D> {
79 fn drop(&mut self) {
80 let dropped = u64::cast_from(
81 self.key_migration_by_ids.by_ids.len() + self.val_migration_by_ids.by_ids.len(),
82 );
83 self.applier
84 .metrics
85 .schema
86 .cache_migration
87 .dropped_count
88 .inc_by(dropped);
89 }
90}
91
92impl<K, V, T, D> SchemaCache<K, V, T, D>
93where
94 K: Debug + Codec,
95 V: Debug + Codec,
96 T: Timestamp + Lattice + Codec64 + Sync,
97 D: Monoid + Codec64,
98{
99 pub fn new(maps: Arc<SchemaCacheMaps<K, V>>, applier: Applier<K, V, T, D>) -> Self {
100 let key_migration_by_ids = MigrationCacheMap {
101 metrics: applier.metrics.schema.cache_migration.clone(),
102 by_ids: BTreeMap::new(),
103 };
104 let val_migration_by_ids = MigrationCacheMap {
105 metrics: applier.metrics.schema.cache_migration.clone(),
106 by_ids: BTreeMap::new(),
107 };
108 SchemaCache {
109 maps,
110 applier,
111 key_migration_by_ids,
112 val_migration_by_ids,
113 }
114 }
115
116 async fn schemas(&self, id: &SchemaId) -> Option<Schemas<K, V>> {
117 let key = self
118 .get_or_try_init(&self.maps.key_by_id, id, |schemas| {
119 self.maps.key_by_id.metrics.computed_count.inc();
120 schemas.get(id).map(|x| K::decode_schema(&x.key))
121 })
122 .await?;
123 let val = self
124 .get_or_try_init(&self.maps.val_by_id, id, |schemas| {
125 self.maps.val_by_id.metrics.computed_count.inc();
126 schemas.get(id).map(|x| V::decode_schema(&x.val))
127 })
128 .await?;
129 Some(Schemas {
130 id: Some(*id),
131 key,
132 val,
133 })
134 }
135
136 fn key_migration(
137 &mut self,
138 write: &Schemas<K, V>,
139 read: &Schemas<K, V>,
140 ) -> Option<Arc<Migration>> {
141 let migration_fn = || Self::migration::<K>(&write.key, &read.key);
142 let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
143 self.key_migration_by_ids.metrics.computed_count.inc();
147 return migration_fn().map(Arc::new);
148 };
149 self.key_migration_by_ids
150 .get_or_try_insert(write_id, read_id, migration_fn)
151 }
152
153 fn val_migration(
154 &mut self,
155 write: &Schemas<K, V>,
156 read: &Schemas<K, V>,
157 ) -> Option<Arc<Migration>> {
158 let migration_fn = || Self::migration::<V>(&write.val, &read.val);
159 let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
160 self.val_migration_by_ids.metrics.computed_count.inc();
164 return migration_fn().map(Arc::new);
165 };
166 self.val_migration_by_ids
167 .get_or_try_insert(write_id, read_id, migration_fn)
168 }
169
170 fn migration<C: Codec>(write: &C::Schema, read: &C::Schema) -> Option<Migration> {
171 let write_dt = data_type::<C>(write).expect("valid schema");
172 let read_dt = data_type::<C>(read).expect("valid schema");
173 backward_compatible(&write_dt, &read_dt)
174 }
175
176 async fn get_or_try_init<MK: Clone + Ord, MV: PartialEq + Debug>(
177 &self,
178 map: &SchemaCacheMap<MK, MV>,
179 key: &MK,
180 f: impl Fn(&BTreeMap<SchemaId, EncodedSchemas>) -> Option<MV>,
181 ) -> Option<Arc<MV>> {
182 let ret = map.get_or_try_init(key, || {
183 self.applier
184 .schemas(|seqno, schemas| f(schemas).ok_or(seqno))
185 });
186 let seqno = match ret {
187 Ok(ret) => return Some(ret),
188 Err(seqno) => seqno,
189 };
190 self.applier.metrics.schema.cache_fetch_state_count.inc();
191 self.applier.fetch_and_update_state(Some(seqno)).await;
192 map.get_or_try_init(key, || {
193 self.applier
194 .schemas(|seqno, schemas| f(schemas).ok_or(seqno))
195 })
196 .ok()
197 }
198}
199
200#[derive(Debug)]
201pub(crate) struct SchemaCacheMaps<K: Codec, V: Codec> {
202 key_by_id: SchemaCacheMap<SchemaId, K::Schema>,
203 val_by_id: SchemaCacheMap<SchemaId, V::Schema>,
204}
205
206impl<K: Codec, V: Codec> SchemaCacheMaps<K, V> {
207 pub(crate) fn new(metrics: &SchemaMetrics) -> Self {
208 Self {
209 key_by_id: SchemaCacheMap {
210 metrics: metrics.cache_schema.clone(),
211 map: RwLock::new(BTreeMap::new()),
212 },
213 val_by_id: SchemaCacheMap {
214 metrics: metrics.cache_schema.clone(),
215 map: RwLock::new(BTreeMap::new()),
216 },
217 }
218 }
219}
220
221#[derive(Debug)]
222struct SchemaCacheMap<I, S> {
223 metrics: SchemaCacheMetrics,
224 map: RwLock<BTreeMap<I, Arc<S>>>,
225}
226
227impl<I: Clone + Ord, S: PartialEq + Debug> SchemaCacheMap<I, S> {
228 fn get_or_try_init<E>(
229 &self,
230 id: &I,
231 state_fn: impl FnOnce() -> Result<S, E>,
232 ) -> Result<Arc<S>, E> {
233 {
235 let map = self.map.read().expect("lock");
236 if let Some(ret) = map.get(id).map(Arc::clone) {
237 self.metrics.cached_count.inc();
238 return Ok(ret);
239 }
240 }
241 let ret = state_fn().map(Arc::new);
243 if let Ok(val) = ret.as_ref() {
244 let mut map = self.map.write().expect("lock");
245 let prev = map.insert(id.clone(), Arc::clone(val));
248 match prev {
249 Some(prev) => debug_assert_eq!(*val, prev),
250 None => self.metrics.added_count.inc(),
251 }
252 } else {
253 self.metrics.unavailable_count.inc();
254 }
255 ret
256 }
257}
258
259impl<I, K> Drop for SchemaCacheMap<I, K> {
260 fn drop(&mut self) {
261 let map = self.map.read().expect("lock");
262 self.metrics.dropped_count.inc_by(u64::cast_from(map.len()));
263 }
264}
265
266#[derive(Debug, Clone)]
267struct MigrationCacheMap {
268 metrics: SchemaCacheMetrics,
269 by_ids: BTreeMap<(SchemaId, SchemaId), Arc<Migration>>,
270}
271
272impl MigrationCacheMap {
273 fn get_or_try_insert(
274 &mut self,
275 write_id: SchemaId,
276 read_id: SchemaId,
277 migration_fn: impl FnOnce() -> Option<Migration>,
278 ) -> Option<Arc<Migration>> {
279 if let Some(migration) = self.by_ids.get(&(write_id, read_id)) {
280 self.metrics.cached_count.inc();
281 return Some(Arc::clone(migration));
282 };
283 self.metrics.computed_count.inc();
284 let migration = migration_fn().map(Arc::new);
285 if let Some(migration) = migration.as_ref() {
286 self.metrics.added_count.inc();
287 self.by_ids
290 .insert((write_id, read_id), Arc::clone(migration));
291 } else {
292 self.metrics.unavailable_count.inc();
293 }
294 migration
295 }
296}
297
298#[derive(Debug)]
299pub(crate) enum PartMigration<K: Codec, V: Codec> {
300 SameSchema { both: Schemas<K, V> },
302 Schemaless { read: Schemas<K, V> },
304 Either {
306 write: Schemas<K, V>,
307 read: Schemas<K, V>,
308 key_migration: Arc<Migration>,
309 val_migration: Arc<Migration>,
310 },
311}
312
313impl<K: Codec, V: Codec> Clone for PartMigration<K, V> {
314 fn clone(&self) -> Self {
315 match self {
316 Self::SameSchema { both } => Self::SameSchema { both: both.clone() },
317 Self::Schemaless { read } => Self::Schemaless { read: read.clone() },
318 Self::Either {
319 write,
320 read,
321 key_migration,
322 val_migration,
323 } => Self::Either {
324 write: write.clone(),
325 read: read.clone(),
326 key_migration: Arc::clone(key_migration),
327 val_migration: Arc::clone(val_migration),
328 },
329 }
330 }
331}
332
333impl<K, V> PartMigration<K, V>
334where
335 K: Debug + Codec,
336 V: Debug + Codec,
337{
338 pub(crate) async fn new<T, D>(
339 part: &BatchPart<T>,
340 read: Schemas<K, V>,
341 schema_cache: &mut SchemaCache<K, V, T, D>,
342 ) -> Result<Self, Schemas<K, V>>
343 where
344 T: Timestamp + Lattice + Codec64 + Sync,
345 D: Monoid + Codec64,
346 {
347 let write = match (part.schema_id(), part.deprecated_schema_id()) {
356 (Some(write_id), _) => Some(write_id),
357 (None, Some(deprecated_id))
358 if part.is_structured_only(&schema_cache.applier.metrics.columnar) =>
359 {
360 tracing::warn!(?deprecated_id, "falling back to deprecated schema ID");
361 Some(deprecated_id)
362 }
363 (None, _) => None,
364 };
365
366 match (write, read.id) {
367 (None, _) => Ok(PartMigration::Schemaless { read }),
368 (Some(w), Some(r)) if w == r => Ok(PartMigration::SameSchema { both: read }),
369 (Some(w), _) => {
370 let write = schema_cache
371 .schemas(&w)
372 .await
373 .expect("appended part should reference registered schema");
374 if write.key == read.key && write.val == read.val {
379 return Ok(PartMigration::SameSchema { both: read });
380 }
381
382 let start = Instant::now();
383 let key_migration = schema_cache
384 .key_migration(&write, &read)
385 .ok_or_else(|| read.clone())?;
386 let val_migration = schema_cache
387 .val_migration(&write, &read)
388 .ok_or_else(|| read.clone())?;
389 schema_cache
390 .applier
391 .metrics
392 .schema
393 .migration_new_count
394 .inc();
395 schema_cache
396 .applier
397 .metrics
398 .schema
399 .migration_new_seconds
400 .inc_by(start.elapsed().as_secs_f64());
401
402 Ok(PartMigration::Either {
403 write,
404 read,
405 key_migration,
406 val_migration,
407 })
408 }
409 }
410 }
411}
412
413impl<K: Codec, V: Codec> PartMigration<K, V> {
414 pub(crate) fn codec_read(&self) -> &Schemas<K, V> {
415 match self {
416 PartMigration::SameSchema { both } => both,
417 PartMigration::Schemaless { read } => read,
418 PartMigration::Either { read, .. } => read,
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use arrow::array::{
426 Array, ArrayBuilder, StringArray, StringBuilder, StructArray, as_string_array,
427 };
428 use arrow::datatypes::{DataType, Field};
429 use bytes::BufMut;
430 use futures::StreamExt;
431 use mz_dyncfg::ConfigUpdates;
432 use mz_persist_types::ShardId;
433 use mz_persist_types::arrow::ArrayOrd;
434 use mz_persist_types::codec_impls::UnitSchema;
435 use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
436 use mz_persist_types::stats::{NoneStats, StructStats};
437 use timely::progress::Antichain;
438
439 use crate::Diagnostics;
440 use crate::cli::admin::info_log_non_zero_metrics;
441 use crate::read::ReadHandle;
442 use crate::tests::new_test_client;
443
444 use super::*;
445
446 #[mz_ore::test]
447 fn schema_id() {
448 assert_eq!(SchemaId(1).to_string(), "h1");
449 assert_eq!(SchemaId::try_from("h1".to_owned()), Ok(SchemaId(1)));
450 assert!(SchemaId::try_from("nope".to_owned()).is_err());
451 }
452
453 #[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
454 struct Strings(Vec<String>);
455
456 impl Codec for Strings {
457 type Schema = StringsSchema;
458 type Storage = ();
459
460 fn codec_name() -> String {
461 "Strings".into()
462 }
463
464 fn encode<B: BufMut>(&self, buf: &mut B) {
465 buf.put_slice(self.0.join(",").as_bytes());
466 }
467 fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String> {
468 let buf = std::str::from_utf8(buf).map_err(|err| err.to_string())?;
469 let mut ret = buf.split(",").map(|x| x.to_owned()).collect::<Vec<_>>();
470 while schema.0.len() > ret.len() {
472 ret.push("".into());
473 }
474 while schema.0.len() < ret.len() {
475 ret.pop();
476 }
477 Ok(Strings(ret))
478 }
479
480 fn encode_schema(schema: &Self::Schema) -> bytes::Bytes {
481 schema
482 .0
483 .iter()
484 .map(|x| x.then_some('n').unwrap_or(' '))
485 .collect::<String>()
486 .into_bytes()
487 .into()
488 }
489 fn decode_schema(buf: &bytes::Bytes) -> Self::Schema {
490 let buf = std::str::from_utf8(buf).expect("valid schema");
491 StringsSchema(
492 buf.chars()
493 .map(|x| match x {
494 'n' => true,
495 ' ' => false,
496 _ => unreachable!(),
497 })
498 .collect(),
499 )
500 }
501 }
502
503 #[derive(Debug, Clone, Default, PartialEq)]
504 struct StringsSchema(Vec<bool>);
505
506 impl Schema<Strings> for StringsSchema {
507 type ArrowColumn = StructArray;
508 type Statistics = NoneStats;
509 type Decoder = StringsDecoder;
510 type Encoder = StringsEncoder;
511
512 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
513 let mut cols = Vec::new();
514 for (idx, _) in self.0.iter().enumerate() {
515 cols.push(as_string_array(col.column_by_name(&idx.to_string()).unwrap()).clone());
516 }
517 Ok(StringsDecoder(cols))
518 }
519 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
520 let mut fields = Vec::new();
521 let mut arrays = Vec::new();
522 for (idx, nullable) in self.0.iter().enumerate() {
523 fields.push(Field::new(idx.to_string(), DataType::Utf8, *nullable));
524 arrays.push(StringBuilder::new());
525 }
526 Ok(StringsEncoder { fields, arrays })
527 }
528 }
529
530 #[derive(Debug)]
531 struct StringsDecoder(Vec<StringArray>);
532 impl ColumnDecoder<Strings> for StringsDecoder {
533 fn decode(&self, idx: usize, val: &mut Strings) {
534 val.0.clear();
535 for col in self.0.iter() {
536 if col.is_valid(idx) {
537 val.0.push(col.value(idx).into());
538 } else {
539 val.0.push("".into());
540 }
541 }
542 }
543 fn is_null(&self, _: usize) -> bool {
544 false
545 }
546 fn goodbytes(&self) -> usize {
547 self.0
548 .iter()
549 .map(|val| ArrayOrd::String(val.clone()).goodbytes())
550 .sum()
551 }
552 fn stats(&self) -> StructStats {
553 StructStats {
554 len: self.0[0].len(),
555 cols: Default::default(),
556 }
557 }
558 }
559
560 #[derive(Debug)]
561 struct StringsEncoder {
562 fields: Vec<Field>,
563 arrays: Vec<StringBuilder>,
564 }
565 impl ColumnEncoder<Strings> for StringsEncoder {
566 type FinishedColumn = StructArray;
567
568 fn goodbytes(&self) -> usize {
569 self.arrays.iter().map(|a| a.values_slice().len()).sum()
570 }
571
572 fn append(&mut self, val: &Strings) {
573 for (idx, val) in val.0.iter().enumerate() {
574 if val.is_empty() {
575 self.arrays[idx].append_null();
576 } else {
577 self.arrays[idx].append_value(val);
578 }
579 }
580 }
581 fn append_null(&mut self) {
582 unreachable!()
583 }
584 fn finish(self) -> Self::FinishedColumn {
585 assert_eq!(self.fields.len(), self.arrays.len(), "invalid schema");
586 if self.fields.is_empty() {
587 StructArray::new_empty_fields(0, None)
588 } else {
589 let arrays = self
590 .arrays
591 .into_iter()
592 .map(|mut x| ArrayBuilder::finish(&mut x))
593 .collect();
594 StructArray::new(self.fields.into(), arrays, None)
595 }
596 }
597 }
598
599 #[mz_persist_proc::test(tokio::test)]
600 #[cfg_attr(miri, ignore)]
601 async fn compare_and_evolve_schema(dyncfgs: ConfigUpdates) {
602 let client = new_test_client(&dyncfgs).await;
603 let d = Diagnostics::for_tests();
604 let shard_id = ShardId::new();
605 let schema0 = StringsSchema(vec![false]);
606 let schema1 = StringsSchema(vec![false, true]);
607
608 let mut write0 = client
609 .open_writer::<Strings, (), u64, i64>(
610 shard_id,
611 Arc::new(schema0.clone()),
612 Arc::new(UnitSchema),
613 d.clone(),
614 )
615 .await
616 .unwrap();
617
618 write0.ensure_schema_registered().await;
619 assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));
620
621 let res = client
624 .compare_and_evolve_schema::<Strings, (), u64, i64>(
625 shard_id,
626 SchemaId(0),
627 &StringsSchema(vec![]),
628 &UnitSchema,
629 d.clone(),
630 )
631 .await
632 .unwrap();
633 assert_eq!(res, CaESchema::Incompatible);
634
635 let res = client
637 .compare_and_evolve_schema::<Strings, (), u64, i64>(
638 shard_id,
639 SchemaId(1),
640 &schema1,
641 &UnitSchema,
642 d.clone(),
643 )
644 .await
645 .unwrap();
646 assert_eq!(
647 res,
648 CaESchema::ExpectedMismatch {
649 schema_id: SchemaId(0),
650 key: schema0,
651 val: UnitSchema
652 }
653 );
654
655 let res = client
657 .compare_and_evolve_schema::<Strings, (), u64, i64>(
658 shard_id,
659 SchemaId(0),
660 &schema1,
661 &UnitSchema,
662 d.clone(),
663 )
664 .await
665 .unwrap();
666 assert_eq!(res, CaESchema::Ok(SchemaId(1)));
667
668 let write1 = client
671 .open_writer::<Strings, (), u64, i64>(
672 shard_id,
673 Arc::new(schema1),
674 Arc::new(UnitSchema),
675 d.clone(),
676 )
677 .await
678 .unwrap();
679 assert_eq!(write1.write_schemas.id.unwrap(), SchemaId(1));
680 }
681
682 fn strings(xs: &[((Result<Strings, String>, Result<(), String>), u64, i64)]) -> Vec<Vec<&str>> {
683 xs.iter()
684 .map(|((k, _), _, _)| k.as_ref().unwrap().0.iter().map(|x| x.as_str()).collect())
685 .collect()
686 }
687
688 #[mz_persist_proc::test(tokio::test)]
689 #[cfg_attr(miri, ignore)]
690 async fn schema_evolution(dyncfgs: ConfigUpdates) {
691 async fn snap_streaming(
692 as_of: u64,
693 read: &mut ReadHandle<Strings, (), u64, i64>,
694 ) -> Vec<((Result<Strings, String>, Result<(), String>), u64, i64)> {
695 let mut ret = read
698 .snapshot_and_stream(Antichain::from_elem(as_of))
699 .await
700 .unwrap()
701 .collect::<Vec<_>>()
702 .await;
703 ret.sort();
704 ret
705 }
706
707 let client = new_test_client(&dyncfgs).await;
708 let d = Diagnostics::for_tests();
709 let shard_id = ShardId::new();
710 let schema0 = StringsSchema(vec![false]);
711 let schema1 = StringsSchema(vec![false, true]);
712
713 let (mut write0, mut read0) = client
715 .open::<Strings, (), u64, i64>(
716 shard_id,
717 Arc::new(schema0.clone()),
718 Arc::new(UnitSchema),
719 d.clone(),
720 true,
721 )
722 .await
723 .unwrap();
724 write0
725 .expect_compare_and_append(&[((Strings(vec!["0 before".into()]), ()), 0, 1)], 0, 1)
726 .await;
727 let expected = vec![vec!["0 before"]];
728 assert_eq!(strings(&snap_streaming(0, &mut read0).await), expected);
729 assert_eq!(strings(&read0.expect_snapshot_and_fetch(0).await), expected);
730
731 let res = client
733 .compare_and_evolve_schema::<Strings, (), u64, i64>(
734 shard_id,
735 SchemaId(0),
736 &schema1,
737 &UnitSchema,
738 d.clone(),
739 )
740 .await
741 .unwrap();
742 assert_eq!(res, CaESchema::Ok(SchemaId(1)));
743 let (mut write1, mut read1) = client
744 .open::<Strings, (), u64, i64>(
745 shard_id,
746 Arc::new(schema1.clone()),
747 Arc::new(UnitSchema),
748 d.clone(),
749 true,
750 )
751 .await
752 .unwrap();
753 write1
754 .expect_compare_and_append(
755 &[
756 ((Strings(vec!["1 null".into(), "".into()]), ()), 1, 1),
757 ((Strings(vec!["1 not".into(), "x".into()]), ()), 1, 1),
758 ],
759 1,
760 2,
761 )
762 .await;
763
764 write0
766 .expect_compare_and_append(&[((Strings(vec!["0 after".into()]), ()), 2, 1)], 2, 3)
767 .await;
768
769 let expected = vec![
771 vec!["0 after"],
772 vec!["0 before"],
773 vec!["1 not"],
774 vec!["1 null"],
775 ];
776 assert_eq!(strings(&snap_streaming(2, &mut read0).await), expected);
777 assert_eq!(strings(&read0.expect_snapshot_and_fetch(2).await), expected);
778
779 let expected = vec![
782 vec!["0 after", ""],
783 vec!["0 before", ""],
784 vec!["1 not", "x"],
785 vec!["1 null", ""],
786 ];
787 assert_eq!(strings(&snap_streaming(2, &mut read1).await), expected);
788 assert_eq!(strings(&read1.expect_snapshot_and_fetch(2).await), expected);
789
790 if false {
793 info_log_non_zero_metrics(&client.metrics.registry.gather());
794 }
795 }
796}