mz_adapter/catalog/open/
builtin_item_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Migrations for builtin items.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use mz_adapter_types::dyncfgs::ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION;
18use mz_catalog::SYSTEM_CONN_ID;
19use mz_catalog::builtin::{BUILTINS, BuiltinTable, Fingerprint};
20use mz_catalog::config::BuiltinItemMigrationConfig;
21use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
22use mz_catalog::durable::{
23    DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
24};
25use mz_catalog::memory::error::{Error, ErrorKind};
26use mz_catalog::memory::objects::CatalogItem;
27use mz_ore::collections::CollectionExt;
28use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
29use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
30use mz_persist_client::critical::SinceHandle;
31use mz_persist_client::read::ReadHandle;
32use mz_persist_client::schema::CaESchema;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient};
35use mz_persist_types::ShardId;
36use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
37use mz_repr::{CatalogItemId, GlobalId, Timestamp};
38use mz_sql::catalog::CatalogItem as _;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::StorageDiff;
41use mz_storage_types::sources::SourceData;
42use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
43use tracing::{debug, error, info, warn};
44
45use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
46use crate::catalog::state::LocalExpressionCache;
47use crate::catalog::{BuiltinTableUpdate, CatalogState};
48
49/// The results of a builtin item migration.
50pub(crate) struct BuiltinItemMigrationResult {
51    /// A vec of updates to apply to the builtin tables.
52    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
53    /// A set of new shards that may need to be initialized.
54    pub(crate) migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
55    /// Some cleanup action to take once the migration has been made durable.
56    pub(crate) cleanup_action: BoxFuture<'static, ()>,
57}
58
59/// Perform migrations for any builtin items that may have changed between versions.
60///
61/// We only need to do anything for items that have an associated storage collection. Others
62/// (views, indexes) don't have any durable state that requires migration.
63///
64/// We have the ability to handle some backward-compatible schema changes through persist schema
65/// evolution, and we do so when possible. For changes that schema evolution doesn't support, we
66/// instead "migrate" the affected storage collections by creating new persist shards with the new
67/// schemas and dropping the old ones. See [`migrate_builtin_collections_incompatible`] for
68/// details.
69pub(crate) async fn migrate_builtin_items(
70    state: &mut CatalogState,
71    txn: &mut Transaction<'_>,
72    local_expr_cache: &mut LocalExpressionCache,
73    migrated_builtins: Vec<CatalogItemId>,
74    BuiltinItemMigrationConfig {
75        persist_client,
76        read_only,
77    }: BuiltinItemMigrationConfig,
78) -> Result<BuiltinItemMigrationResult, Error> {
79    assert_eq!(
80        read_only,
81        txn.is_savepoint(),
82        "txn must be in savepoint mode when read_only is true, and in writable mode otherwise",
83    );
84
85    update_catalog_fingerprints(state, txn, &migrated_builtins)?;
86
87    // Collect GlobalIds of storage collections we need to migrate.
88    let mut collections_to_migrate: Vec<_> = migrated_builtins
89        .into_iter()
90        .filter_map(|id| {
91            use CatalogItem::*;
92            match &state.get_entry(&id).item() {
93                Table(table) => Some(table.global_ids().into_element()),
94                Source(source) => Some(source.global_id()),
95                MaterializedView(mv) => Some(mv.global_id()),
96                ContinualTask(ct) => Some(ct.global_id()),
97                Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_)
98                | Connection(_) => None,
99            }
100        })
101        .collect();
102
103    // Attempt to perform schema evolution.
104    //
105    // If we run into an unexpected error while trying to perform schema evolution, we abort the
106    // migration process, rather than automatically falling back to replacing the persist shards.
107    // We do this to avoid accidentally losing data due to a bug. This gives us the option to
108    // decide if we'd rather fix the bug or skip schema evolution using the dyncfg flag.
109    if ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION.get(state.system_config().dyncfgs()) {
110        collections_to_migrate =
111            try_evolve_persist_schemas(state, txn, collections_to_migrate, &persist_client).await?;
112    } else {
113        info!("skipping builtin migration by schema evolution");
114    }
115
116    // For collections whose schemas we couldn't evolve, perform the replacement process.
117    // Note that we need to invoke this process even if `collections_to_migrate` is empty because
118    // it also cleans up any leftovers of previous migrations from the migration shard.
119    migrate_builtin_collections_incompatible(
120        state,
121        txn,
122        local_expr_cache,
123        persist_client,
124        collections_to_migrate,
125        read_only,
126    )
127    .await
128}
129
130/// Update the durably stored fingerprints of `migrated_builtins`.
131fn update_catalog_fingerprints(
132    state: &CatalogState,
133    txn: &mut Transaction<'_>,
134    migrated_builtins: &[CatalogItemId],
135) -> Result<(), Error> {
136    let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
137        .map(|builtin| {
138            let id = state.resolve_builtin_object(builtin);
139            let fingerprint = builtin.fingerprint();
140            (id, fingerprint)
141        })
142        .collect();
143    let mut migrated_system_object_mappings = BTreeMap::new();
144    for item_id in migrated_builtins {
145        let fingerprint = id_fingerprint_map
146            .get(item_id)
147            .expect("missing fingerprint");
148        let entry = state.get_entry(item_id);
149        let schema_name = state
150            .get_schema(
151                &entry.name().qualifiers.database_spec,
152                &entry.name().qualifiers.schema_spec,
153                entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
154            )
155            .name
156            .schema
157            .as_str();
158        // Builtin Items can only be referenced by a single GlobalId.
159        let global_id = state.get_entry(item_id).global_ids().into_element();
160
161        migrated_system_object_mappings.insert(
162            *item_id,
163            SystemObjectMapping {
164                description: SystemObjectDescription {
165                    schema_name: schema_name.to_string(),
166                    object_type: entry.item_type(),
167                    object_name: entry.name().item.clone(),
168                },
169                unique_identifier: SystemObjectUniqueIdentifier {
170                    catalog_id: *item_id,
171                    global_id,
172                    fingerprint: fingerprint.clone(),
173                },
174            },
175        );
176    }
177    txn.update_system_object_mappings(migrated_system_object_mappings)?;
178
179    Ok(())
180}
181
182/// Attempt to migrate the given builtin collections using persist schema evolution.
183///
184/// Returns the IDs of collections for which schema evolution did not succeed, due to an "expected"
185/// error. At the moment, there are two expected reasons for schema evolution to fail: (a) the
186/// existing shard doesn't have a schema and (b) the new schema is incompatible with the existing
187/// schema.
188///
189/// If this method encounters an unexpected error, it returns an `Error` instead. The caller is
190/// expected to abort the migration process in response.
191async fn try_evolve_persist_schemas(
192    state: &CatalogState,
193    txn: &Transaction<'_>,
194    migrated_storage_collections: Vec<GlobalId>,
195    persist_client: &PersistClient,
196) -> Result<Vec<GlobalId>, Error> {
197    let collection_metadata = txn.get_collection_metadata();
198
199    let mut failed = Vec::new();
200    for id in migrated_storage_collections {
201        let Some(&shard_id) = collection_metadata.get(&id) else {
202            return Err(Error::new(ErrorKind::Internal(format!(
203                "builtin migration: missing metadata for builtin collection {id}"
204            ))));
205        };
206
207        let diagnostics = Diagnostics {
208            shard_name: id.to_string(),
209            handle_purpose: "migrate builtin schema".to_string(),
210        };
211        let Some((old_schema_id, old_schema, _)) = persist_client
212            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
213            .await
214            .expect("invalid usage")
215        else {
216            // There are two known cases where it is expected to not find a latest schema:
217            //   * The existing shard has never been written to previously.
218            //   * We are running inside an upgrade check, with an in-memory `persist_client`.
219            info!(%id, "builtin schema evolution failed: missing latest schema");
220            failed.push(id);
221            continue;
222        };
223
224        let entry = state.get_entry_by_global_id(&id);
225        let Some(new_schema) = entry.desc_opt_latest() else {
226            return Err(Error::new(ErrorKind::Internal(format!(
227                "builtin migration: missing new schema for builtin collection {id}"
228            ))));
229        };
230
231        info!(%id, ?old_schema, ?new_schema, "attempting builtin schema evolution");
232
233        let result = persist_client
234            .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
235                shard_id,
236                old_schema_id,
237                &new_schema,
238                &UnitSchema,
239                diagnostics,
240            )
241            .await
242            .expect("invalid usage");
243
244        match result {
245            CaESchema::Ok(_) => {
246                info!("builtin schema evolution succeeded");
247            }
248            CaESchema::Incompatible => {
249                info!("builtin schema evolution failed: incompatible");
250                failed.push(id);
251            }
252            CaESchema::ExpectedMismatch { schema_id, .. } => {
253                return Err(Error::new(ErrorKind::Internal(format!(
254                    "builtin migration: unexpected schema mismatch ({} != {})",
255                    schema_id, old_schema_id,
256                ))));
257            }
258        }
259    }
260
261    Ok(failed)
262}
263
264/// Migrate builtin collections that are not supported by persist schema evolution.
265///
266/// The high level description of this approach is that we create new shards for each migrated
267/// builtin collection with the new schema, without changing the global ID. Dependent objects are
268/// not modified but now read from the new shards.
269///
270/// A detailed description of this approach follows. It's important that all of these steps are
271/// idempotent, so that we can safely crash at any point and non-upgrades turn into a no-op.
272///
273///    1. Each environment has a dedicated persist shard, called the migration shard, that allows
274///       environments to durably write down metadata while in read-only mode. The shard is a
275///       mapping of `(GlobalId, build_version)` to `ShardId`.
276///    2. Read in the current contents of the migration shard.
277///    3. Collect all the `ShardId`s from the migration shard that are not at the current
278///       `build_version` or are not in the set of migrated collections.
279///       a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards
280///          from an incomplete migration. Finalize them and remove them from the migration shard.
281///          Note: care must be taken to not remove the shard from the migration shard until we are
282///          sure that they will be finalized, otherwise the shard will leak.
283///       b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a
284///          complete migration. Remove them from the migration shard.
285///    4. Collect all the `GlobalId`s of collections that are migrated, but not in the migration
286///       shard for the current build version. Generate new `ShardId`s and add them to the
287///       migration shard.
288///    5. At this point the migration shard should only logically contain a mapping of migrated
289///       collection `GlobalId`s to new `ShardId`s for the current build version. For each of these
290///       `GlobalId`s such that the `ShardId` isn't already in the storage metadata:
291///       a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata.
292///       b. Finalize the removed `ShardId`s.
293///       c. Insert the new `GlobalId` to `ShardId` mapping into the storage metadata.
294///
295/// This approach breaks the abstraction boundary between the catalog and the storage metadata, but
296/// these types of rare, but extremely useful, abstraction breaks is the exact reason they are
297/// co-located.
298///
299/// Since the new shards are created in read-only mode, they will be left empty and all dependent
300/// items will fail to hydrate.
301///
302/// While in read-only mode we write the migration changes to `txn`, which will update the
303/// in-memory catalog, which will cause the new shards to be created in storage. However, we don't
304/// have to worry about the catalog changes becoming durable because the `txn` is in savepoint
305/// mode. When we re-execute this migration as the leader (i.e. outside of read-only mode), `txn`
306/// will be writable and the migration will be made durable in the catalog. We always write
307/// directly to the migration shard, regardless of read-only mode. So we have to be careful not to
308/// remove anything from the migration shard until we're sure that its results have been made
309/// durable elsewhere.
310async fn migrate_builtin_collections_incompatible(
311    state: &mut CatalogState,
312    txn: &mut Transaction<'_>,
313    local_expr_cache: &mut LocalExpressionCache,
314    persist_client: PersistClient,
315    migrated_storage_collections: Vec<GlobalId>,
316    read_only: bool,
317) -> Result<BuiltinItemMigrationResult, Error> {
318    let build_version = state.config.build_info.semver_version();
319
320    // The migration shard only stores raw GlobalIds, so it's more convenient to keep the list of
321    // migrated collections in that form.
322    let migrated_storage_collections: Vec<_> = migrated_storage_collections
323        .into_iter()
324        .map(|gid| match gid {
325            GlobalId::System(raw) => raw,
326            _ => panic!("builtins must have system IDs"),
327        })
328        .collect();
329
330    // 1. Open migration shard.
331    let organization_id = state.config.environment_id.organization_id();
332    let shard_id = txn
333        .get_builtin_migration_shard()
334        .expect("builtin migration shard should exist for opened catalogs");
335    let diagnostics = Diagnostics {
336        shard_name: "builtin_migration".to_string(),
337        handle_purpose: format!(
338            "builtin table migration shard for org {organization_id:?} version {build_version:?}"
339        ),
340    };
341    let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
342        persist_client
343            .open_critical_since(
344                shard_id,
345                // TODO: We may need to use a different critical reader
346                // id for this if we want to be able to introspect it via SQL.
347                PersistClient::CONTROLLER_CRITICAL_SINCE,
348                diagnostics.clone(),
349            )
350            .await
351            .expect("invalid usage");
352    let (mut write_handle, mut read_handle): (
353        WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
354        ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
355    ) = persist_client
356        .open(
357            shard_id,
358            Arc::new(TableKeySchema),
359            Arc::new(ShardIdSchema),
360            diagnostics,
361            USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
362        )
363        .await
364        .expect("invalid usage");
365    // Commit an empty write at the minimum timestamp so the shard is always readable.
366    const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
367    let res = write_handle
368        .compare_and_append(
369            EMPTY_UPDATES,
370            Antichain::from_elem(Timestamp::minimum()),
371            Antichain::from_elem(Timestamp::minimum().step_forward()),
372        )
373        .await
374        .expect("invalid usage");
375    if let Err(e) = res {
376        debug!("migration shard already initialized: {e:?}");
377    }
378
379    // 2. Read in the current contents of the migration shard.
380    // We intentionally fetch the upper AFTER opening the read handle to address races between
381    // the upper and since moving forward in some other process.
382    let upper = fetch_upper(&mut write_handle).await;
383    // The empty write above should ensure that the upper is at least 1.
384    let as_of = upper.checked_sub(1).ok_or_else(|| {
385        Error::new(ErrorKind::Internal(format!(
386            "builtin migration failed, unexpected upper: {upper:?}"
387        )))
388    })?;
389    let since = read_handle.since();
390    assert!(
391        since.less_equal(&as_of),
392        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
393    );
394    let as_of = Antichain::from_elem(as_of);
395    let snapshot = read_handle
396        .snapshot_and_fetch(as_of)
397        .await
398        .expect("we have advanced the as_of by the since");
399    soft_assert_or_log!(
400        snapshot.iter().all(|(_, _, diff)| *diff == 1),
401        "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}"
402    );
403    let mut global_id_shards: BTreeMap<_, _> = snapshot
404        .into_iter()
405        .filter_map(|(data, _ts, _diff)| {
406            if let (Ok(table_key), Ok(shard_id)) = data {
407                Some((table_key, shard_id))
408            } else {
409                // If we can't decode the data, it has likely been written by a newer version, so
410                // we ignore it.
411                warn!("skipping unreadable migration shard entry: {data:?}");
412                None
413            }
414        })
415        .collect();
416
417    // 4. Clean up contents of migration shard.
418    let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new();
419    let mut migration_shards_to_finalize = BTreeSet::new();
420    let storage_collection_metadata = txn.get_collection_metadata();
421    for (table_key, shard_id) in global_id_shards.clone() {
422        if table_key.build_version > build_version {
423            if read_only {
424                halt!(
425                    "saw build version {}, which is greater than current build version {}",
426                    table_key.build_version,
427                    build_version
428                );
429            } else {
430                // If we are in leader mode, and a newer (read-only) version has started a
431                // migration, we must not allow ourselves to get fenced out! Continuing here might
432                // confuse any read-only process running the migrations concurrently, but it's
433                // better for the read-only env to crash than the leader.
434                // TODO(#9755): handle this in a more principled way
435                warn!(
436                    %table_key.build_version, %build_version,
437                    "saw build version which is greater than current build version",
438                );
439                global_id_shards.remove(&table_key);
440                continue;
441            }
442        }
443
444        if !migrated_storage_collections.contains(&table_key.global_id)
445            || table_key.build_version < build_version
446        {
447            global_id_shards.remove(&table_key);
448            if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
449                == Some(&shard_id)
450            {
451                migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
452            } else {
453                migration_shards_to_finalize.insert((table_key, shard_id));
454            }
455        }
456    }
457
458    // 5. Add migrated tables to migration shard for current build version.
459    let mut global_id_shards: BTreeMap<_, _> = global_id_shards
460        .into_iter()
461        .map(|(table_key, shard_id)| (table_key.global_id, shard_id))
462        .collect();
463    for global_id in migrated_storage_collections {
464        if !global_id_shards.contains_key(&global_id) {
465            let shard_id = ShardId::new();
466            global_id_shards.insert(global_id, shard_id);
467            let table_key = TableKey {
468                global_id,
469                build_version: build_version.clone(),
470            };
471            migrated_shard_updates.push(((table_key, shard_id), upper, 1));
472        }
473    }
474
475    // It's very important that we use the same `upper` that was used to read in a snapshot of the
476    // shard. If someone updated the shard after we read then this write will fail.
477    let upper = if !migrated_shard_updates.is_empty() {
478        write_to_migration_shard(
479            migrated_shard_updates,
480            upper,
481            &mut write_handle,
482            &mut since_handle,
483        )
484        .await?
485    } else {
486        upper
487    };
488
489    // 6. Update `GlobalId` to `ShardId` mapping and register old `ShardId`s for finalization. We don't do the finalization here and instead rely on the background finalization task.
490    let migrated_storage_collections_0dt = {
491        let txn: &mut dyn StorageTxn<Timestamp> = txn;
492        let storage_collection_metadata = txn.get_collection_metadata();
493        let global_id_shards: BTreeMap<_, _> = global_id_shards
494            .into_iter()
495            .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id))
496            .filter(|(global_id, shard_id)| {
497                storage_collection_metadata.get(global_id) != Some(shard_id)
498            })
499            .collect();
500        let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect();
501        let mut old_shard_ids: BTreeSet<_> = txn
502            .delete_collection_metadata(global_ids.clone())
503            .into_iter()
504            .map(|(_, shard_id)| shard_id)
505            .collect();
506        old_shard_ids.extend(
507            migration_shards_to_finalize
508                .iter()
509                .map(|(_, shard_id)| shard_id),
510        );
511        txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| {
512            Error::new(ErrorKind::Internal(format!(
513                "builtin migration failed: {e}"
514            )))
515        })?;
516        txn.insert_collection_metadata(global_id_shards)
517            .map_err(|e| {
518                Error::new(ErrorKind::Internal(format!(
519                    "builtin migration failed: {e}"
520                )))
521            })?;
522        global_ids
523    };
524
525    // 7. Map the migrated `GlobalId`s to their corresponding `CatalogItemId`.
526    let migrated_storage_collections_0dt = migrated_storage_collections_0dt
527        .into_iter()
528        .map(|gid| state.get_entry_by_global_id(&gid).id())
529        .collect();
530
531    let updates = txn.get_and_commit_op_updates();
532    let builtin_table_updates = state
533        .apply_updates_for_bootstrap(updates, local_expr_cache)
534        .await;
535
536    let cleanup_action = async move {
537        if !read_only {
538            let updates: Vec<_> = migration_shards_to_finalize
539                .into_iter()
540                .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
541                .collect();
542            if !updates.is_empty() {
543                // Ignore any errors, these shards will get cleaned up in the next upgrade.
544                // It's important to use `upper` here. If there was another concurrent write at
545                // `upper`, then `updates` are no longer valid.
546                let res =
547                    write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle)
548                        .await;
549                if let Err(e) = res {
550                    error!("Unable to remove old entries from migration shard: {e:?}");
551                }
552            }
553        }
554    }
555    .boxed();
556
557    Ok(BuiltinItemMigrationResult {
558        builtin_table_updates,
559        migrated_storage_collections_0dt,
560        cleanup_action,
561    })
562}
563
564async fn fetch_upper(
565    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
566) -> Timestamp {
567    write_handle
568        .fetch_recent_upper()
569        .await
570        .as_option()
571        .cloned()
572        .expect("we use a totally ordered time and never finalize the shard")
573}
574
575async fn write_to_migration_shard(
576    updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)>,
577    upper: Timestamp,
578    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
579    since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
580) -> Result<Timestamp, Error> {
581    let next_upper = upper.step_forward();
582    // Lag the shard's upper by 1 to keep it readable.
583    let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
584    let next_upper_antichain = Antichain::from_elem(next_upper);
585
586    if let Err(err) = write_handle
587        .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain)
588        .await
589        .expect("invalid usage")
590    {
591        return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence(
592            FenceError::migration(err),
593        ))));
594    }
595
596    // The since handle gives us the ability to fence out other downgraders using an opaque token.
597    // (See the method documentation for details.)
598    // That's not needed here, so we use the since handle's opaque token to avoid any comparison
599    // failures.
600    let opaque = *since_handle.opaque();
601    let downgrade = since_handle
602        .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
603        .await;
604    match downgrade {
605        None => {}
606        Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
607        Some(Ok(updated)) => soft_assert_or_log!(
608            updated == downgrade_to,
609            "updated bound ({updated:?}) should match expected ({downgrade_to:?})"
610        ),
611    }
612
613    Ok(next_upper)
614}
615
616mod persist_schema {
617    use std::num::ParseIntError;
618
619    use arrow::array::{StringArray, StringBuilder};
620    use bytes::{BufMut, Bytes};
621    use mz_persist_types::Codec;
622    use mz_persist_types::codec_impls::{
623        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
624    };
625    use mz_persist_types::columnar::Schema;
626    use mz_persist_types::stats::NoneStats;
627
628    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
629    pub(super) struct TableKey {
630        pub(super) global_id: u64,
631        pub(super) build_version: semver::Version,
632    }
633
634    impl std::fmt::Display for TableKey {
635        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
636            write!(f, "{}-{}", self.global_id, self.build_version)
637        }
638    }
639
640    impl std::str::FromStr for TableKey {
641        type Err = String;
642
643        fn from_str(s: &str) -> Result<Self, Self::Err> {
644            let parts: Vec<_> = s.splitn(2, '-').collect();
645            let &[global_id, build_version] = parts.as_slice() else {
646                return Err(format!("invalid TableKey '{s}'"));
647            };
648            let global_id = global_id
649                .parse()
650                .map_err(|e: ParseIntError| e.to_string())?;
651            let build_version = build_version
652                .parse()
653                .map_err(|e: semver::Error| e.to_string())?;
654            Ok(TableKey {
655                global_id,
656                build_version,
657            })
658        }
659    }
660
661    impl From<TableKey> for String {
662        fn from(table_key: TableKey) -> Self {
663            table_key.to_string()
664        }
665    }
666
667    impl TryFrom<String> for TableKey {
668        type Error = String;
669
670        fn try_from(s: String) -> Result<Self, Self::Error> {
671            s.parse()
672        }
673    }
674
675    impl Default for TableKey {
676        fn default() -> Self {
677            Self {
678                global_id: Default::default(),
679                build_version: semver::Version::new(0, 0, 0),
680            }
681        }
682    }
683
684    impl Codec for TableKey {
685        type Storage = ();
686        type Schema = TableKeySchema;
687        fn codec_name() -> String {
688            "TableKey".into()
689        }
690        fn encode<B: BufMut>(&self, buf: &mut B) {
691            buf.put(self.to_string().as_bytes())
692        }
693        fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result<Self, String> {
694            let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
695            table_key.parse()
696        }
697        fn encode_schema(_schema: &Self::Schema) -> Bytes {
698            Bytes::new()
699        }
700        fn decode_schema(buf: &Bytes) -> Self::Schema {
701            assert_eq!(*buf, Bytes::new());
702            TableKeySchema
703        }
704    }
705
706    impl SimpleColumnarData for TableKey {
707        type ArrowBuilder = StringBuilder;
708        type ArrowColumn = StringArray;
709
710        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
711            builder.values_slice().len()
712        }
713
714        fn push(&self, builder: &mut Self::ArrowBuilder) {
715            builder.append_value(&self.to_string());
716        }
717        fn push_null(builder: &mut Self::ArrowBuilder) {
718            builder.append_null();
719        }
720        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
721            *self = column.value(idx).parse().expect("should be valid TableKey");
722        }
723    }
724
725    /// An implementation of [Schema] for [TableKey].
726    #[derive(Debug, PartialEq)]
727    pub(super) struct TableKeySchema;
728
729    impl Schema<TableKey> for TableKeySchema {
730        type ArrowColumn = StringArray;
731        type Statistics = NoneStats;
732
733        type Decoder = SimpleColumnarDecoder<TableKey>;
734        type Encoder = SimpleColumnarEncoder<TableKey>;
735
736        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
737            Ok(SimpleColumnarEncoder::default())
738        }
739
740        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
741            Ok(SimpleColumnarDecoder::new(col))
742        }
743    }
744}