1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::{Arc, RwLock};
15use std::time::Instant;
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use mz_ore::cast::CastFrom;
20use mz_persist_types::columnar::data_type;
21use mz_persist_types::schema::{Migration, SchemaId, backward_compatible};
22use mz_persist_types::{Codec, Codec64};
23use timely::progress::Timestamp;
24
25use crate::internal::apply::Applier;
26use crate::internal::encoding::Schemas;
27use crate::internal::metrics::{SchemaCacheMetrics, SchemaMetrics};
28use crate::internal::state::{BatchPart, EncodedSchemas};
29
30#[derive(Debug)]
32#[cfg_attr(test, derive(PartialEq))]
33pub enum CaESchema<K: Codec, V: Codec> {
34 Ok(SchemaId),
36 Incompatible,
38 ExpectedMismatch {
41 schema_id: SchemaId,
43 key: K::Schema,
45 val: V::Schema,
47 },
48}
49
50#[derive(Debug)]
60pub(crate) struct SchemaCache<K: Codec, V: Codec, T, D> {
61 maps: Arc<SchemaCacheMaps<K, V>>,
62 applier: Applier<K, V, T, D>,
63 key_migration_by_ids: MigrationCacheMap,
64 val_migration_by_ids: MigrationCacheMap,
65}
66
67impl<K: Codec, V: Codec, T: Clone, D> Clone for SchemaCache<K, V, T, D> {
68 fn clone(&self) -> Self {
69 Self {
70 maps: Arc::clone(&self.maps),
71 applier: self.applier.clone(),
72 key_migration_by_ids: self.key_migration_by_ids.clone(),
73 val_migration_by_ids: self.val_migration_by_ids.clone(),
74 }
75 }
76}
77
78impl<K: Codec, V: Codec, T, D> Drop for SchemaCache<K, V, T, D> {
79 fn drop(&mut self) {
80 let dropped = u64::cast_from(
81 self.key_migration_by_ids.by_ids.len() + self.val_migration_by_ids.by_ids.len(),
82 );
83 self.applier
84 .metrics
85 .schema
86 .cache_migration
87 .dropped_count
88 .inc_by(dropped);
89 }
90}
91
92impl<K, V, T, D> SchemaCache<K, V, T, D>
93where
94 K: Debug + Codec,
95 V: Debug + Codec,
96 T: Timestamp + Lattice + Codec64 + Sync,
97 D: Monoid + Codec64,
98{
99 pub(crate) fn applier(&self) -> &Applier<K, V, T, D> {
101 &self.applier
102 }
103
104 pub fn new(maps: Arc<SchemaCacheMaps<K, V>>, applier: Applier<K, V, T, D>) -> Self {
105 let key_migration_by_ids = MigrationCacheMap {
106 metrics: applier.metrics.schema.cache_migration.clone(),
107 by_ids: BTreeMap::new(),
108 };
109 let val_migration_by_ids = MigrationCacheMap {
110 metrics: applier.metrics.schema.cache_migration.clone(),
111 by_ids: BTreeMap::new(),
112 };
113 SchemaCache {
114 maps,
115 applier,
116 key_migration_by_ids,
117 val_migration_by_ids,
118 }
119 }
120
121 async fn schemas(&self, id: &SchemaId) -> Option<Schemas<K, V>> {
122 let key = self
123 .get_or_try_init(&self.maps.key_by_id, id, |schemas| {
124 self.maps.key_by_id.metrics.computed_count.inc();
125 schemas.get(id).map(|x| K::decode_schema(&x.key))
126 })
127 .await?;
128 let val = self
129 .get_or_try_init(&self.maps.val_by_id, id, |schemas| {
130 self.maps.val_by_id.metrics.computed_count.inc();
131 schemas.get(id).map(|x| V::decode_schema(&x.val))
132 })
133 .await?;
134 Some(Schemas {
135 id: Some(*id),
136 key,
137 val,
138 })
139 }
140
141 fn key_migration(
142 &mut self,
143 write: &Schemas<K, V>,
144 read: &Schemas<K, V>,
145 ) -> Option<Arc<Migration>> {
146 let migration_fn = || Self::migration::<K>(&write.key, &read.key);
147 let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
148 self.key_migration_by_ids.metrics.computed_count.inc();
152 return migration_fn().map(Arc::new);
153 };
154 self.key_migration_by_ids
155 .get_or_try_insert(write_id, read_id, migration_fn)
156 }
157
158 fn val_migration(
159 &mut self,
160 write: &Schemas<K, V>,
161 read: &Schemas<K, V>,
162 ) -> Option<Arc<Migration>> {
163 let migration_fn = || Self::migration::<V>(&write.val, &read.val);
164 let (Some(write_id), Some(read_id)) = (write.id, read.id) else {
165 self.val_migration_by_ids.metrics.computed_count.inc();
169 return migration_fn().map(Arc::new);
170 };
171 self.val_migration_by_ids
172 .get_or_try_insert(write_id, read_id, migration_fn)
173 }
174
175 fn migration<C: Codec>(write: &C::Schema, read: &C::Schema) -> Option<Migration> {
176 let write_dt = data_type::<C>(write).expect("valid schema");
177 let read_dt = data_type::<C>(read).expect("valid schema");
178 backward_compatible(&write_dt, &read_dt)
179 }
180
181 async fn get_or_try_init<MK: Clone + Ord, MV: PartialEq + Debug>(
182 &self,
183 map: &SchemaCacheMap<MK, MV>,
184 key: &MK,
185 f: impl Fn(&BTreeMap<SchemaId, EncodedSchemas>) -> Option<MV>,
186 ) -> Option<Arc<MV>> {
187 let ret = map.get_or_try_init(key, || {
188 self.applier
189 .schemas(|seqno, schemas| f(schemas).ok_or(seqno))
190 });
191 let seqno = match ret {
192 Ok(ret) => return Some(ret),
193 Err(seqno) => seqno,
194 };
195 self.applier.metrics.schema.cache_fetch_state_count.inc();
196 self.applier.fetch_and_update_state(Some(seqno)).await;
197 map.get_or_try_init(key, || {
198 self.applier
199 .schemas(|seqno, schemas| f(schemas).ok_or(seqno))
200 })
201 .ok()
202 }
203}
204
205#[derive(Debug)]
206pub(crate) struct SchemaCacheMaps<K: Codec, V: Codec> {
207 key_by_id: SchemaCacheMap<SchemaId, K::Schema>,
208 val_by_id: SchemaCacheMap<SchemaId, V::Schema>,
209}
210
211impl<K: Codec, V: Codec> SchemaCacheMaps<K, V> {
212 pub(crate) fn new(metrics: &SchemaMetrics) -> Self {
213 Self {
214 key_by_id: SchemaCacheMap {
215 metrics: metrics.cache_schema.clone(),
216 map: RwLock::new(BTreeMap::new()),
217 },
218 val_by_id: SchemaCacheMap {
219 metrics: metrics.cache_schema.clone(),
220 map: RwLock::new(BTreeMap::new()),
221 },
222 }
223 }
224}
225
226#[derive(Debug)]
227struct SchemaCacheMap<I, S> {
228 metrics: SchemaCacheMetrics,
229 map: RwLock<BTreeMap<I, Arc<S>>>,
230}
231
232impl<I: Clone + Ord, S: PartialEq + Debug> SchemaCacheMap<I, S> {
233 fn get_or_try_init<E>(
234 &self,
235 id: &I,
236 state_fn: impl FnOnce() -> Result<S, E>,
237 ) -> Result<Arc<S>, E> {
238 {
240 let map = self.map.read().expect("lock");
241 if let Some(ret) = map.get(id).map(Arc::clone) {
242 self.metrics.cached_count.inc();
243 return Ok(ret);
244 }
245 }
246 let ret = state_fn().map(Arc::new);
248 if let Ok(val) = ret.as_ref() {
249 let mut map = self.map.write().expect("lock");
250 let prev = map.insert(id.clone(), Arc::clone(val));
253 match prev {
254 Some(prev) => debug_assert_eq!(*val, prev),
255 None => self.metrics.added_count.inc(),
256 }
257 } else {
258 self.metrics.unavailable_count.inc();
259 }
260 ret
261 }
262}
263
264impl<I, K> Drop for SchemaCacheMap<I, K> {
265 fn drop(&mut self) {
266 let map = self.map.read().expect("lock");
267 self.metrics.dropped_count.inc_by(u64::cast_from(map.len()));
268 }
269}
270
271#[derive(Debug, Clone)]
272struct MigrationCacheMap {
273 metrics: SchemaCacheMetrics,
274 by_ids: BTreeMap<(SchemaId, SchemaId), Arc<Migration>>,
275}
276
277impl MigrationCacheMap {
278 fn get_or_try_insert(
279 &mut self,
280 write_id: SchemaId,
281 read_id: SchemaId,
282 migration_fn: impl FnOnce() -> Option<Migration>,
283 ) -> Option<Arc<Migration>> {
284 if let Some(migration) = self.by_ids.get(&(write_id, read_id)) {
285 self.metrics.cached_count.inc();
286 return Some(Arc::clone(migration));
287 };
288 self.metrics.computed_count.inc();
289 let migration = migration_fn().map(Arc::new);
290 if let Some(migration) = migration.as_ref() {
291 self.metrics.added_count.inc();
292 self.by_ids
295 .insert((write_id, read_id), Arc::clone(migration));
296 } else {
297 self.metrics.unavailable_count.inc();
298 }
299 migration
300 }
301}
302
303#[derive(Debug)]
304pub(crate) enum PartMigration<K: Codec, V: Codec> {
305 SameSchema { both: Schemas<K, V> },
307 Schemaless { read: Schemas<K, V> },
309 Either {
311 write: Schemas<K, V>,
312 read: Schemas<K, V>,
313 key_migration: Arc<Migration>,
314 val_migration: Arc<Migration>,
315 },
316}
317
318impl<K: Codec, V: Codec> Clone for PartMigration<K, V> {
319 fn clone(&self) -> Self {
320 match self {
321 Self::SameSchema { both } => Self::SameSchema { both: both.clone() },
322 Self::Schemaless { read } => Self::Schemaless { read: read.clone() },
323 Self::Either {
324 write,
325 read,
326 key_migration,
327 val_migration,
328 } => Self::Either {
329 write: write.clone(),
330 read: read.clone(),
331 key_migration: Arc::clone(key_migration),
332 val_migration: Arc::clone(val_migration),
333 },
334 }
335 }
336}
337
338impl<K, V> PartMigration<K, V>
339where
340 K: Debug + Codec,
341 V: Debug + Codec,
342{
343 pub(crate) async fn new<T, D>(
344 part: &BatchPart<T>,
345 read: Schemas<K, V>,
346 schema_cache: &mut SchemaCache<K, V, T, D>,
347 ) -> Result<Self, Schemas<K, V>>
348 where
349 T: Timestamp + Lattice + Codec64 + Sync,
350 D: Monoid + Codec64,
351 {
352 let write = match (part.schema_id(), part.deprecated_schema_id()) {
361 (Some(write_id), _) => Some(write_id),
362 (None, Some(deprecated_id))
363 if part.is_structured_only(&schema_cache.applier.metrics.columnar) =>
364 {
365 tracing::warn!(?deprecated_id, "falling back to deprecated schema ID");
366 Some(deprecated_id)
367 }
368 (None, _) => None,
369 };
370
371 match (write, read.id) {
372 (None, _) => Ok(PartMigration::Schemaless { read }),
373 (Some(w), Some(r)) if w == r => Ok(PartMigration::SameSchema { both: read }),
374 (Some(w), _) => {
375 let write = schema_cache
376 .schemas(&w)
377 .await
378 .expect("appended part should reference registered schema");
379 if write.key == read.key && write.val == read.val {
384 return Ok(PartMigration::SameSchema { both: read });
385 }
386
387 let start = Instant::now();
388 let key_migration = schema_cache
389 .key_migration(&write, &read)
390 .ok_or_else(|| read.clone())?;
391 let val_migration = schema_cache
392 .val_migration(&write, &read)
393 .ok_or_else(|| read.clone())?;
394 schema_cache
395 .applier
396 .metrics
397 .schema
398 .migration_new_count
399 .inc();
400 schema_cache
401 .applier
402 .metrics
403 .schema
404 .migration_new_seconds
405 .inc_by(start.elapsed().as_secs_f64());
406
407 Ok(PartMigration::Either {
408 write,
409 read,
410 key_migration,
411 val_migration,
412 })
413 }
414 }
415 }
416}
417
418impl<K: Codec, V: Codec> PartMigration<K, V> {
419 pub(crate) fn codec_read(&self) -> &Schemas<K, V> {
420 match self {
421 PartMigration::SameSchema { both } => both,
422 PartMigration::Schemaless { read } => read,
423 PartMigration::Either { read, .. } => read,
424 }
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use arrow::array::{
431 Array, ArrayBuilder, StringArray, StringBuilder, StructArray, as_string_array,
432 };
433 use arrow::datatypes::{DataType, Field};
434 use bytes::BufMut;
435 use futures::StreamExt;
436 use mz_dyncfg::ConfigUpdates;
437 use mz_persist_types::ShardId;
438 use mz_persist_types::arrow::ArrayOrd;
439 use mz_persist_types::codec_impls::UnitSchema;
440 use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
441 use mz_persist_types::stats::{NoneStats, StructStats};
442 use timely::progress::Antichain;
443
444 use crate::Diagnostics;
445 use crate::cli::admin::info_log_non_zero_metrics;
446 use crate::read::ReadHandle;
447 use crate::tests::new_test_client;
448
449 use super::*;
450
451 #[mz_ore::test]
452 fn schema_id() {
453 assert_eq!(SchemaId(1).to_string(), "h1");
454 assert_eq!(SchemaId::try_from("h1".to_owned()), Ok(SchemaId(1)));
455 assert!(SchemaId::try_from("nope".to_owned()).is_err());
456 }
457
458 #[derive(Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
459 struct Strings(Vec<String>);
460
461 impl Codec for Strings {
462 type Schema = StringsSchema;
463 type Storage = ();
464
465 fn codec_name() -> String {
466 "Strings".into()
467 }
468
469 fn encode<B: BufMut>(&self, buf: &mut B) {
470 buf.put_slice(self.0.join(",").as_bytes());
471 }
472 fn decode<'a>(buf: &'a [u8], schema: &Self::Schema) -> Result<Self, String> {
473 let buf = std::str::from_utf8(buf).map_err(|err| err.to_string())?;
474 let mut ret = buf.split(",").map(|x| x.to_owned()).collect::<Vec<_>>();
475 while schema.0.len() > ret.len() {
477 ret.push("".into());
478 }
479 while schema.0.len() < ret.len() {
480 ret.pop();
481 }
482 Ok(Strings(ret))
483 }
484
485 fn encode_schema(schema: &Self::Schema) -> bytes::Bytes {
486 schema
487 .0
488 .iter()
489 .map(|x| x.then_some('n').unwrap_or(' '))
490 .collect::<String>()
491 .into_bytes()
492 .into()
493 }
494 fn decode_schema(buf: &bytes::Bytes) -> Self::Schema {
495 let buf = std::str::from_utf8(buf).expect("valid schema");
496 StringsSchema(
497 buf.chars()
498 .map(|x| match x {
499 'n' => true,
500 ' ' => false,
501 _ => unreachable!(),
502 })
503 .collect(),
504 )
505 }
506 }
507
508 #[derive(Debug, Clone, Default, PartialEq)]
509 struct StringsSchema(Vec<bool>);
510
511 impl Schema<Strings> for StringsSchema {
512 type ArrowColumn = StructArray;
513 type Statistics = NoneStats;
514 type Decoder = StringsDecoder;
515 type Encoder = StringsEncoder;
516
517 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
518 let mut cols = Vec::new();
519 for (idx, _) in self.0.iter().enumerate() {
520 cols.push(as_string_array(col.column_by_name(&idx.to_string()).unwrap()).clone());
521 }
522 Ok(StringsDecoder(cols))
523 }
524 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
525 let mut fields = Vec::new();
526 let mut arrays = Vec::new();
527 for (idx, nullable) in self.0.iter().enumerate() {
528 fields.push(Field::new(idx.to_string(), DataType::Utf8, *nullable));
529 arrays.push(StringBuilder::new());
530 }
531 Ok(StringsEncoder { fields, arrays })
532 }
533 }
534
535 #[derive(Debug)]
536 struct StringsDecoder(Vec<StringArray>);
537 impl ColumnDecoder<Strings> for StringsDecoder {
538 fn decode(&self, idx: usize, val: &mut Strings) {
539 val.0.clear();
540 for col in self.0.iter() {
541 if col.is_valid(idx) {
542 val.0.push(col.value(idx).into());
543 } else {
544 val.0.push("".into());
545 }
546 }
547 }
548 fn is_null(&self, _: usize) -> bool {
549 false
550 }
551 fn goodbytes(&self) -> usize {
552 self.0
553 .iter()
554 .map(|val| ArrayOrd::String(val.clone()).goodbytes())
555 .sum()
556 }
557 fn stats(&self) -> StructStats {
558 StructStats {
559 len: self.0[0].len(),
560 cols: Default::default(),
561 }
562 }
563 }
564
565 #[derive(Debug)]
566 struct StringsEncoder {
567 fields: Vec<Field>,
568 arrays: Vec<StringBuilder>,
569 }
570 impl ColumnEncoder<Strings> for StringsEncoder {
571 type FinishedColumn = StructArray;
572
573 fn goodbytes(&self) -> usize {
574 self.arrays.iter().map(|a| a.values_slice().len()).sum()
575 }
576
577 fn append(&mut self, val: &Strings) {
578 for (idx, val) in val.0.iter().enumerate() {
579 if val.is_empty() {
580 self.arrays[idx].append_null();
581 } else {
582 self.arrays[idx].append_value(val);
583 }
584 }
585 }
586 fn append_null(&mut self) {
587 unreachable!()
588 }
589 fn finish(self) -> Self::FinishedColumn {
590 assert_eq!(self.fields.len(), self.arrays.len(), "invalid schema");
591 if self.fields.is_empty() {
592 StructArray::new_empty_fields(0, None)
593 } else {
594 let arrays = self
595 .arrays
596 .into_iter()
597 .map(|mut x| ArrayBuilder::finish(&mut x))
598 .collect();
599 StructArray::new(self.fields.into(), arrays, None)
600 }
601 }
602 }
603
604 #[mz_persist_proc::test(tokio::test)]
605 #[cfg_attr(miri, ignore)]
606 async fn compare_and_evolve_schema(dyncfgs: ConfigUpdates) {
607 let client = new_test_client(&dyncfgs).await;
608 let d = Diagnostics::for_tests();
609 let shard_id = ShardId::new();
610 let schema0 = StringsSchema(vec![false]);
611 let schema1 = StringsSchema(vec![false, true]);
612
613 let mut write0 = client
614 .open_writer::<Strings, (), u64, i64>(
615 shard_id,
616 Arc::new(schema0.clone()),
617 Arc::new(UnitSchema),
618 d.clone(),
619 )
620 .await
621 .unwrap();
622
623 write0.try_register_schema().await;
624 assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));
625
626 let res = client
629 .compare_and_evolve_schema::<Strings, (), u64, i64>(
630 shard_id,
631 SchemaId(0),
632 &StringsSchema(vec![]),
633 &UnitSchema,
634 d.clone(),
635 )
636 .await
637 .unwrap();
638 assert_eq!(res, CaESchema::Incompatible);
639
640 let res = client
642 .compare_and_evolve_schema::<Strings, (), u64, i64>(
643 shard_id,
644 SchemaId(1),
645 &schema1,
646 &UnitSchema,
647 d.clone(),
648 )
649 .await
650 .unwrap();
651 assert_eq!(
652 res,
653 CaESchema::ExpectedMismatch {
654 schema_id: SchemaId(0),
655 key: schema0,
656 val: UnitSchema
657 }
658 );
659
660 let res = client
662 .compare_and_evolve_schema::<Strings, (), u64, i64>(
663 shard_id,
664 SchemaId(0),
665 &schema1,
666 &UnitSchema,
667 d.clone(),
668 )
669 .await
670 .unwrap();
671 assert_eq!(res, CaESchema::Ok(SchemaId(1)));
672
673 let write1 = client
676 .open_writer::<Strings, (), u64, i64>(
677 shard_id,
678 Arc::new(schema1),
679 Arc::new(UnitSchema),
680 d.clone(),
681 )
682 .await
683 .unwrap();
684 assert_eq!(write1.write_schemas.id.unwrap(), SchemaId(1));
685 }
686
687 fn strings(xs: &[((Strings, ()), u64, i64)]) -> Vec<Vec<&str>> {
688 xs.iter()
689 .map(|((k, _), _, _)| k.0.iter().map(|x| x.as_str()).collect())
690 .collect()
691 }
692
693 #[mz_persist_proc::test(tokio::test)]
694 #[cfg_attr(miri, ignore)]
695 async fn schema_evolution(dyncfgs: ConfigUpdates) {
696 async fn snap_streaming(
697 as_of: u64,
698 read: &mut ReadHandle<Strings, (), u64, i64>,
699 ) -> Vec<((Strings, ()), u64, i64)> {
700 let mut ret = read
703 .snapshot_and_stream(Antichain::from_elem(as_of))
704 .await
705 .unwrap()
706 .collect::<Vec<_>>()
707 .await;
708 ret.sort();
709 ret
710 }
711
712 let client = new_test_client(&dyncfgs).await;
713 let d = Diagnostics::for_tests();
714 let shard_id = ShardId::new();
715 let schema0 = StringsSchema(vec![false]);
716 let schema1 = StringsSchema(vec![false, true]);
717
718 let (mut write0, mut read0) = client
720 .open::<Strings, (), u64, i64>(
721 shard_id,
722 Arc::new(schema0.clone()),
723 Arc::new(UnitSchema),
724 d.clone(),
725 true,
726 )
727 .await
728 .unwrap();
729 write0
730 .expect_compare_and_append(&[((Strings(vec!["0 before".into()]), ()), 0, 1)], 0, 1)
731 .await;
732 let expected = vec![vec!["0 before"]];
733 assert_eq!(strings(&snap_streaming(0, &mut read0).await), expected);
734 assert_eq!(strings(&read0.expect_snapshot_and_fetch(0).await), expected);
735
736 let res = client
738 .compare_and_evolve_schema::<Strings, (), u64, i64>(
739 shard_id,
740 SchemaId(0),
741 &schema1,
742 &UnitSchema,
743 d.clone(),
744 )
745 .await
746 .unwrap();
747 assert_eq!(res, CaESchema::Ok(SchemaId(1)));
748 let (mut write1, mut read1) = client
749 .open::<Strings, (), u64, i64>(
750 shard_id,
751 Arc::new(schema1.clone()),
752 Arc::new(UnitSchema),
753 d.clone(),
754 true,
755 )
756 .await
757 .unwrap();
758 write1
759 .expect_compare_and_append(
760 &[
761 ((Strings(vec!["1 null".into(), "".into()]), ()), 1, 1),
762 ((Strings(vec!["1 not".into(), "x".into()]), ()), 1, 1),
763 ],
764 1,
765 2,
766 )
767 .await;
768
769 write0
771 .expect_compare_and_append(&[((Strings(vec!["0 after".into()]), ()), 2, 1)], 2, 3)
772 .await;
773
774 let expected = vec![
776 vec!["0 after"],
777 vec!["0 before"],
778 vec!["1 not"],
779 vec!["1 null"],
780 ];
781 assert_eq!(strings(&snap_streaming(2, &mut read0).await), expected);
782 assert_eq!(strings(&read0.expect_snapshot_and_fetch(2).await), expected);
783
784 let expected = vec![
787 vec!["0 after", ""],
788 vec!["0 before", ""],
789 vec!["1 not", "x"],
790 vec!["1 null", ""],
791 ];
792 assert_eq!(strings(&snap_streaming(2, &mut read1).await), expected);
793 assert_eq!(strings(&read1.expect_snapshot_and_fetch(2).await), expected);
794
795 if false {
798 info_log_non_zero_metrics(&client.metrics.registry.gather());
799 }
800 }
801}