mz_adapter/catalog/open/
builtin_item_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Migrations for builtin items.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use mz_adapter_types::dyncfgs::ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION;
18use mz_catalog::SYSTEM_CONN_ID;
19use mz_catalog::builtin::{BUILTINS, BuiltinTable, Fingerprint};
20use mz_catalog::config::BuiltinItemMigrationConfig;
21use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
22use mz_catalog::durable::{
23    DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
24};
25use mz_catalog::memory::error::{Error, ErrorKind};
26use mz_catalog::memory::objects::CatalogItem;
27use mz_ore::collections::CollectionExt;
28use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
29use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
30use mz_persist_client::critical::SinceHandle;
31use mz_persist_client::read::ReadHandle;
32use mz_persist_client::schema::CaESchema;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient};
35use mz_persist_types::ShardId;
36use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
37use mz_repr::{CatalogItemId, GlobalId, Timestamp};
38use mz_sql::catalog::CatalogItem as _;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::StorageDiff;
41use mz_storage_types::sources::SourceData;
42use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
43use tracing::{debug, error, info};
44
45use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
46use crate::catalog::state::LocalExpressionCache;
47use crate::catalog::{BuiltinTableUpdate, CatalogState};
48
49/// The results of a builtin item migration.
50pub(crate) struct BuiltinItemMigrationResult {
51    /// A vec of updates to apply to the builtin tables.
52    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
53    /// A set of new shards that may need to be initialized.
54    pub(crate) migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
55    /// Some cleanup action to take once the migration has been made durable.
56    pub(crate) cleanup_action: BoxFuture<'static, ()>,
57}
58
59/// Perform migrations for any builtin items that may have changed between versions.
60///
61/// We only need to do anything for items that have an associated storage collection. Others
62/// (views, indexes) don't have any durable state that requires migration.
63///
64/// We have the ability to handle some backward-compatible schema changes through persist schema
65/// evolution, and we do so when possible. For changes that schema evolution doesn't support, we
66/// instead "migrate" the affected storage collections by creating new persist shards with the new
67/// schemas and dropping the old ones. See [`migrate_builtin_collections_incompatible`] for
68/// details.
69pub(crate) async fn migrate_builtin_items(
70    state: &mut CatalogState,
71    txn: &mut Transaction<'_>,
72    local_expr_cache: &mut LocalExpressionCache,
73    migrated_builtins: Vec<CatalogItemId>,
74    BuiltinItemMigrationConfig {
75        persist_client,
76        read_only,
77    }: BuiltinItemMigrationConfig,
78) -> Result<BuiltinItemMigrationResult, Error> {
79    assert_eq!(
80        read_only,
81        txn.is_savepoint(),
82        "txn must be in savepoint mode when read_only is true, and in writable mode otherwise",
83    );
84
85    update_catalog_fingerprints(state, txn, &migrated_builtins)?;
86
87    // Collect GlobalIds of storage collections we need to migrate.
88    let mut collections_to_migrate: Vec<_> = migrated_builtins
89        .into_iter()
90        .filter_map(|id| {
91            use CatalogItem::*;
92            match &state.get_entry(&id).item() {
93                Table(table) => Some(table.global_ids().into_element()),
94                Source(source) => Some(source.global_id()),
95                MaterializedView(mv) => Some(mv.global_id()),
96                ContinualTask(ct) => Some(ct.global_id()),
97                Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_)
98                | Connection(_) => None,
99            }
100        })
101        .collect();
102
103    // Attempt to perform schema evolution.
104    //
105    // If we run into an unexpected error while trying to perform schema evolution, we abort the
106    // migration process, rather than automatically falling back to replacing the persist shards.
107    // We do this to avoid accidentally losing data due to a bug. This gives us the option to
108    // decide if we'd rather fix the bug or skip schema evolution using the dyncfg flag.
109    if ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION.get(state.system_config().dyncfgs()) {
110        collections_to_migrate =
111            try_evolve_persist_schemas(state, txn, collections_to_migrate, &persist_client).await?;
112    } else {
113        info!("skipping builtin migration by schema evolution");
114    }
115
116    // For collections whose schemas we couldn't evolve, perform the replacement process.
117    // Note that we need to invoke this process even if `collections_to_migrate` is empty because
118    // it also cleans up any leftovers of previous migrations from the migration shard.
119    migrate_builtin_collections_incompatible(
120        state,
121        txn,
122        local_expr_cache,
123        persist_client,
124        collections_to_migrate,
125        read_only,
126    )
127    .await
128}
129
130/// Update the durably stored fingerprints of `migrated_builtins`.
131fn update_catalog_fingerprints(
132    state: &CatalogState,
133    txn: &mut Transaction<'_>,
134    migrated_builtins: &[CatalogItemId],
135) -> Result<(), Error> {
136    let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
137        .map(|builtin| {
138            let id = state.resolve_builtin_object(builtin);
139            let fingerprint = builtin.fingerprint();
140            (id, fingerprint)
141        })
142        .collect();
143    let mut migrated_system_object_mappings = BTreeMap::new();
144    for item_id in migrated_builtins {
145        let fingerprint = id_fingerprint_map
146            .get(item_id)
147            .expect("missing fingerprint");
148        let entry = state.get_entry(item_id);
149        let schema_name = state
150            .get_schema(
151                &entry.name().qualifiers.database_spec,
152                &entry.name().qualifiers.schema_spec,
153                entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
154            )
155            .name
156            .schema
157            .as_str();
158        // Builtin Items can only be referenced by a single GlobalId.
159        let global_id = state.get_entry(item_id).global_ids().into_element();
160
161        migrated_system_object_mappings.insert(
162            *item_id,
163            SystemObjectMapping {
164                description: SystemObjectDescription {
165                    schema_name: schema_name.to_string(),
166                    object_type: entry.item_type(),
167                    object_name: entry.name().item.clone(),
168                },
169                unique_identifier: SystemObjectUniqueIdentifier {
170                    catalog_id: *item_id,
171                    global_id,
172                    fingerprint: fingerprint.clone(),
173                },
174            },
175        );
176    }
177    txn.update_system_object_mappings(migrated_system_object_mappings)?;
178
179    Ok(())
180}
181
182/// Attempt to migrate the given builtin collections using persist schema evolution.
183///
184/// Returns the IDs of collections for which schema evolution did not succeed, due to an "expected"
185/// error. At the moment, there are two expected reasons for schema evolution to fail: (a) the
186/// existing shard doesn't have a schema and (b) the new schema is incompatible with the existing
187/// schema.
188///
189/// If this method encounters an unexpected error, it returns an `Error` instead. The caller is
190/// expected to abort the migration process in response.
191async fn try_evolve_persist_schemas(
192    state: &CatalogState,
193    txn: &Transaction<'_>,
194    migrated_storage_collections: Vec<GlobalId>,
195    persist_client: &PersistClient,
196) -> Result<Vec<GlobalId>, Error> {
197    let collection_metadata = txn.get_collection_metadata();
198
199    let mut failed = Vec::new();
200    for id in migrated_storage_collections {
201        let Some(&shard_id) = collection_metadata.get(&id) else {
202            return Err(Error::new(ErrorKind::Internal(format!(
203                "builtin migration: missing metadata for builtin collection {id}"
204            ))));
205        };
206
207        let diagnostics = Diagnostics {
208            shard_name: id.to_string(),
209            handle_purpose: "migrate builtin schema".to_string(),
210        };
211        let Some((old_schema_id, old_schema, _)) = persist_client
212            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
213            .await
214            .expect("invalid usage")
215        else {
216            // There are two known cases where it is expected to not find a latest schema:
217            //   * The existing shard has never been written to previously.
218            //   * We are running inside an upgrade check, with an in-memory `persist_client`.
219            info!(%id, "builtin schema evolution failed: missing latest schema");
220            failed.push(id);
221            continue;
222        };
223
224        let entry = state.get_entry_by_global_id(&id);
225        let Some(new_schema) = entry.desc_opt_latest() else {
226            return Err(Error::new(ErrorKind::Internal(format!(
227                "builtin migration: missing new schema for builtin collection {id}"
228            ))));
229        };
230
231        info!(%id, ?old_schema, ?new_schema, "attempting builtin schema evolution");
232
233        let result = persist_client
234            .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
235                shard_id,
236                old_schema_id,
237                &new_schema,
238                &UnitSchema,
239                diagnostics,
240            )
241            .await
242            .expect("invalid usage");
243
244        match result {
245            CaESchema::Ok(_) => {
246                info!("builtin schema evolution succeeded");
247            }
248            CaESchema::Incompatible => {
249                info!("builtin schema evolution failed: incompatible");
250                failed.push(id);
251            }
252            CaESchema::ExpectedMismatch { schema_id, .. } => {
253                return Err(Error::new(ErrorKind::Internal(format!(
254                    "builtin migration: unexpected schema mismatch ({} != {})",
255                    schema_id, old_schema_id,
256                ))));
257            }
258        }
259    }
260
261    Ok(failed)
262}
263
264/// Migrate builtin collections that are not supported by persist schema evolution.
265///
266/// The high level description of this approach is that we create new shards for each migrated
267/// builtin collection with the new schema, without changing the global ID. Dependent objects are
268/// not modified but now read from the new shards.
269///
270/// A detailed description of this approach follows. It's important that all of these steps are
271/// idempotent, so that we can safely crash at any point and non-upgrades turn into a no-op.
272///
273///    1. Each environment has a dedicated persist shard, called the migration shard, that allows
274///       environments to durably write down metadata while in read-only mode. The shard is a
275///       mapping of `(GlobalId, build_version)` to `ShardId`.
276///    2. Read in the current contents of the migration shard.
277///    3. Collect all the `ShardId`s from the migration shard that are not at the current
278///       `build_version` or are not in the set of migrated collections.
279///       a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards
280///          from an incomplete migration. Finalize them and remove them from the migration shard.
281///          Note: care must be taken to not remove the shard from the migration shard until we are
282///          sure that they will be finalized, otherwise the shard will leak.
283///       b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a
284///          complete migration. Remove them from the migration shard.
285///    4. Collect all the `GlobalId`s of collections that are migrated, but not in the migration
286///       shard for the current build version. Generate new `ShardId`s and add them to the
287///       migration shard.
288///    5. At this point the migration shard should only logically contain a mapping of migrated
289///       collection `GlobalId`s to new `ShardId`s for the current build version. For each of these
290///       `GlobalId`s such that the `ShardId` isn't already in the storage metadata:
291///       a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata.
292///       b. Finalize the removed `ShardId`s.
293///       c. Insert the new `GlobalId` to `ShardId` mapping into the storage metadata.
294///
295/// This approach breaks the abstraction boundary between the catalog and the storage metadata, but
296/// these types of rare, but extremely useful, abstraction breaks is the exact reason they are
297/// co-located.
298///
299/// Since the new shards are created in read-only mode, they will be left empty and all dependent
300/// items will fail to hydrate.
301///
302/// While in read-only mode we write the migration changes to `txn`, which will update the
303/// in-memory catalog, which will cause the new shards to be created in storage. However, we don't
304/// have to worry about the catalog changes becoming durable because the `txn` is in savepoint
305/// mode. When we re-execute this migration as the leader (i.e. outside of read-only mode), `txn`
306/// will be writable and the migration will be made durable in the catalog. We always write
307/// directly to the migration shard, regardless of read-only mode. So we have to be careful not to
308/// remove anything from the migration shard until we're sure that its results have been made
309/// durable elsewhere.
310async fn migrate_builtin_collections_incompatible(
311    state: &mut CatalogState,
312    txn: &mut Transaction<'_>,
313    local_expr_cache: &mut LocalExpressionCache,
314    persist_client: PersistClient,
315    migrated_storage_collections: Vec<GlobalId>,
316    read_only: bool,
317) -> Result<BuiltinItemMigrationResult, Error> {
318    let build_version = state.config.build_info.semver_version();
319
320    // The migration shard only stores raw GlobalIds, so it's more convenient to keep the list of
321    // migrated collections in that form.
322    let migrated_storage_collections: Vec<_> = migrated_storage_collections
323        .into_iter()
324        .map(|gid| match gid {
325            GlobalId::System(raw) => raw,
326            _ => panic!("builtins must have system IDs"),
327        })
328        .collect();
329
330    // 1. Open migration shard.
331    let organization_id = state.config.environment_id.organization_id();
332    let shard_id = txn
333        .get_builtin_migration_shard()
334        .expect("builtin migration shard should exist for opened catalogs");
335    let diagnostics = Diagnostics {
336        shard_name: "builtin_migration".to_string(),
337        handle_purpose: format!(
338            "builtin table migration shard for org {organization_id:?} version {build_version:?}"
339        ),
340    };
341    let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
342        persist_client
343            .open_critical_since(
344                shard_id,
345                // TODO: We may need to use a different critical reader
346                // id for this if we want to be able to introspect it via SQL.
347                PersistClient::CONTROLLER_CRITICAL_SINCE,
348                diagnostics.clone(),
349            )
350            .await
351            .expect("invalid usage");
352    let (mut write_handle, mut read_handle): (
353        WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
354        ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
355    ) = persist_client
356        .open(
357            shard_id,
358            Arc::new(TableKeySchema),
359            Arc::new(ShardIdSchema),
360            diagnostics,
361            USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
362        )
363        .await
364        .expect("invalid usage");
365    // Commit an empty write at the minimum timestamp so the shard is always readable.
366    const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
367    let res = write_handle
368        .compare_and_append(
369            EMPTY_UPDATES,
370            Antichain::from_elem(Timestamp::minimum()),
371            Antichain::from_elem(Timestamp::minimum().step_forward()),
372        )
373        .await
374        .expect("invalid usage");
375    if let Err(e) = res {
376        debug!("migration shard already initialized: {e:?}");
377    }
378
379    // 2. Read in the current contents of the migration shard.
380    // We intentionally fetch the upper AFTER opening the read handle to address races between
381    // the upper and since moving forward in some other process.
382    let upper = fetch_upper(&mut write_handle).await;
383    // The empty write above should ensure that the upper is at least 1.
384    let as_of = upper.checked_sub(1).ok_or_else(|| {
385        Error::new(ErrorKind::Internal(format!(
386            "builtin migration failed, unexpected upper: {upper:?}"
387        )))
388    })?;
389    let since = read_handle.since();
390    assert!(
391        since.less_equal(&as_of),
392        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
393    );
394    let as_of = Antichain::from_elem(as_of);
395    let snapshot = read_handle
396        .snapshot_and_fetch(as_of)
397        .await
398        .expect("we have advanced the as_of by the since");
399    soft_assert_or_log!(
400        snapshot.iter().all(|(_, _, diff)| *diff == 1),
401        "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}"
402    );
403    let mut global_id_shards: BTreeMap<_, _> = snapshot
404        .into_iter()
405        .map(|((key, value), _ts, _diff)| {
406            let table_key = key.expect("persist decoding error");
407            let shard_id = value.expect("persist decoding error");
408            (table_key, shard_id)
409        })
410        .collect();
411
412    // 4. Clean up contents of migration shard.
413    let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new();
414    let mut migration_shards_to_finalize = BTreeSet::new();
415    let storage_collection_metadata = txn.get_collection_metadata();
416    for (table_key, shard_id) in global_id_shards.clone() {
417        if table_key.build_version > build_version {
418            halt!(
419                "saw build version {}, which is greater than current build version {}",
420                table_key.build_version,
421                build_version
422            );
423        }
424
425        if !migrated_storage_collections.contains(&table_key.global_id)
426            || table_key.build_version < build_version
427        {
428            global_id_shards.remove(&table_key);
429            if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
430                == Some(&shard_id)
431            {
432                migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
433            } else {
434                migration_shards_to_finalize.insert((table_key, shard_id));
435            }
436        }
437    }
438
439    // 5. Add migrated tables to migration shard for current build version.
440    let mut global_id_shards: BTreeMap<_, _> = global_id_shards
441        .into_iter()
442        .map(|(table_key, shard_id)| (table_key.global_id, shard_id))
443        .collect();
444    for global_id in migrated_storage_collections {
445        if !global_id_shards.contains_key(&global_id) {
446            let shard_id = ShardId::new();
447            global_id_shards.insert(global_id, shard_id);
448            let table_key = TableKey {
449                global_id,
450                build_version: build_version.clone(),
451            };
452            migrated_shard_updates.push(((table_key, shard_id), upper, 1));
453        }
454    }
455
456    // It's very important that we use the same `upper` that was used to read in a snapshot of the
457    // shard. If someone updated the shard after we read then this write will fail.
458    let upper = if !migrated_shard_updates.is_empty() {
459        write_to_migration_shard(
460            migrated_shard_updates,
461            upper,
462            &mut write_handle,
463            &mut since_handle,
464        )
465        .await?
466    } else {
467        upper
468    };
469
470    // 6. Update `GlobalId` to `ShardId` mapping and register old `ShardId`s for finalization. We don't do the finalization here and instead rely on the background finalization task.
471    let migrated_storage_collections_0dt = {
472        let txn: &mut dyn StorageTxn<Timestamp> = txn;
473        let storage_collection_metadata = txn.get_collection_metadata();
474        let global_id_shards: BTreeMap<_, _> = global_id_shards
475            .into_iter()
476            .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id))
477            .filter(|(global_id, shard_id)| {
478                storage_collection_metadata.get(global_id) != Some(shard_id)
479            })
480            .collect();
481        let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect();
482        let mut old_shard_ids: BTreeSet<_> = txn
483            .delete_collection_metadata(global_ids.clone())
484            .into_iter()
485            .map(|(_, shard_id)| shard_id)
486            .collect();
487        old_shard_ids.extend(
488            migration_shards_to_finalize
489                .iter()
490                .map(|(_, shard_id)| shard_id),
491        );
492        txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| {
493            Error::new(ErrorKind::Internal(format!(
494                "builtin migration failed: {e}"
495            )))
496        })?;
497        txn.insert_collection_metadata(global_id_shards)
498            .map_err(|e| {
499                Error::new(ErrorKind::Internal(format!(
500                    "builtin migration failed: {e}"
501                )))
502            })?;
503        global_ids
504    };
505
506    // 7. Map the migrated `GlobalId`s to their corresponding `CatalogItemId`.
507    let migrated_storage_collections_0dt = migrated_storage_collections_0dt
508        .into_iter()
509        .map(|gid| state.get_entry_by_global_id(&gid).id())
510        .collect();
511
512    let updates = txn.get_and_commit_op_updates();
513    let builtin_table_updates = state
514        .apply_updates_for_bootstrap(updates, local_expr_cache)
515        .await;
516
517    let cleanup_action = async move {
518        if !read_only {
519            let updates: Vec<_> = migration_shards_to_finalize
520                .into_iter()
521                .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
522                .collect();
523            if !updates.is_empty() {
524                // Ignore any errors, these shards will get cleaned up in the next upgrade.
525                // It's important to use `upper` here. If there was another concurrent write at
526                // `upper`, then `updates` are no longer valid.
527                let res =
528                    write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle)
529                        .await;
530                if let Err(e) = res {
531                    error!("Unable to remove old entries from migration shard: {e:?}");
532                }
533            }
534        }
535    }
536    .boxed();
537
538    Ok(BuiltinItemMigrationResult {
539        builtin_table_updates,
540        migrated_storage_collections_0dt,
541        cleanup_action,
542    })
543}
544
545async fn fetch_upper(
546    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
547) -> Timestamp {
548    write_handle
549        .fetch_recent_upper()
550        .await
551        .as_option()
552        .cloned()
553        .expect("we use a totally ordered time and never finalize the shard")
554}
555
556async fn write_to_migration_shard(
557    updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)>,
558    upper: Timestamp,
559    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
560    since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
561) -> Result<Timestamp, Error> {
562    let next_upper = upper.step_forward();
563    // Lag the shard's upper by 1 to keep it readable.
564    let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
565    let next_upper_antichain = Antichain::from_elem(next_upper);
566
567    if let Err(err) = write_handle
568        .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain)
569        .await
570        .expect("invalid usage")
571    {
572        return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence(
573            FenceError::migration(err),
574        ))));
575    }
576
577    // The since handle gives us the ability to fence out other downgraders using an opaque token.
578    // (See the method documentation for details.)
579    // That's not needed here, so we use the since handle's opaque token to avoid any comparison
580    // failures.
581    let opaque = *since_handle.opaque();
582    let downgrade = since_handle
583        .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
584        .await;
585    match downgrade {
586        None => {}
587        Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
588        Some(Ok(updated)) => soft_assert_or_log!(
589            updated == downgrade_to,
590            "updated bound ({updated:?}) should match expected ({downgrade_to:?})"
591        ),
592    }
593
594    Ok(next_upper)
595}
596
597mod persist_schema {
598    use std::num::ParseIntError;
599
600    use arrow::array::{StringArray, StringBuilder};
601    use bytes::{BufMut, Bytes};
602    use mz_persist_types::Codec;
603    use mz_persist_types::codec_impls::{
604        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
605    };
606    use mz_persist_types::columnar::Schema;
607    use mz_persist_types::stats::NoneStats;
608
609    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
610    pub(super) struct TableKey {
611        pub(super) global_id: u64,
612        pub(super) build_version: semver::Version,
613    }
614
615    impl std::fmt::Display for TableKey {
616        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617            write!(f, "{}-{}", self.global_id, self.build_version)
618        }
619    }
620
621    impl std::str::FromStr for TableKey {
622        type Err = String;
623
624        fn from_str(s: &str) -> Result<Self, Self::Err> {
625            let parts: Vec<_> = s.splitn(2, '-').collect();
626            let &[global_id, build_version] = parts.as_slice() else {
627                return Err(format!("invalid TableKey '{s}'"));
628            };
629            let global_id = global_id
630                .parse()
631                .map_err(|e: ParseIntError| e.to_string())?;
632            let build_version = build_version
633                .parse()
634                .map_err(|e: semver::Error| e.to_string())?;
635            Ok(TableKey {
636                global_id,
637                build_version,
638            })
639        }
640    }
641
642    impl From<TableKey> for String {
643        fn from(table_key: TableKey) -> Self {
644            table_key.to_string()
645        }
646    }
647
648    impl TryFrom<String> for TableKey {
649        type Error = String;
650
651        fn try_from(s: String) -> Result<Self, Self::Error> {
652            s.parse()
653        }
654    }
655
656    impl Default for TableKey {
657        fn default() -> Self {
658            Self {
659                global_id: Default::default(),
660                build_version: semver::Version::new(0, 0, 0),
661            }
662        }
663    }
664
665    impl Codec for TableKey {
666        type Storage = ();
667        type Schema = TableKeySchema;
668        fn codec_name() -> String {
669            "TableKey".into()
670        }
671        fn encode<B: BufMut>(&self, buf: &mut B) {
672            buf.put(self.to_string().as_bytes())
673        }
674        fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result<Self, String> {
675            let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
676            table_key.parse()
677        }
678        fn encode_schema(_schema: &Self::Schema) -> Bytes {
679            Bytes::new()
680        }
681        fn decode_schema(buf: &Bytes) -> Self::Schema {
682            assert_eq!(*buf, Bytes::new());
683            TableKeySchema
684        }
685    }
686
687    impl SimpleColumnarData for TableKey {
688        type ArrowBuilder = StringBuilder;
689        type ArrowColumn = StringArray;
690
691        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
692            builder.values_slice().len()
693        }
694
695        fn push(&self, builder: &mut Self::ArrowBuilder) {
696            builder.append_value(&self.to_string());
697        }
698        fn push_null(builder: &mut Self::ArrowBuilder) {
699            builder.append_null();
700        }
701        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
702            *self = column.value(idx).parse().expect("should be valid TableKey");
703        }
704    }
705
706    /// An implementation of [Schema] for [TableKey].
707    #[derive(Debug, PartialEq)]
708    pub(super) struct TableKeySchema;
709
710    impl Schema<TableKey> for TableKeySchema {
711        type ArrowColumn = StringArray;
712        type Statistics = NoneStats;
713
714        type Decoder = SimpleColumnarDecoder<TableKey>;
715        type Encoder = SimpleColumnarEncoder<TableKey>;
716
717        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
718            Ok(SimpleColumnarEncoder::default())
719        }
720
721        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
722            Ok(SimpleColumnarDecoder::new(col))
723        }
724    }
725}