1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Migrations for builtin items.

use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::FutureExt;
use mz_catalog::builtin::{BuiltinTable, Fingerprint, BUILTINS};
use mz_catalog::config::BuiltinItemMigrationConfig;
use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
use mz_catalog::durable::{
    DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::CatalogItem;
use mz_catalog::SYSTEM_CONN_ID;
use mz_ore::collections::CollectionExt;
use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
use mz_persist_client::critical::SinceHandle;
use mz_persist_client::read::ReadHandle;
use mz_persist_client::write::WriteHandle;
use mz_persist_client::{Diagnostics, PersistClient};
use mz_persist_types::codec_impls::ShardIdSchema;
use mz_persist_types::ShardId;
use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
use mz_sql::catalog::CatalogItem as _;
use mz_storage_client::controller::StorageTxn;
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
use tracing::{debug, error};

use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
use crate::catalog::state::LocalExpressionCache;
use crate::catalog::{BuiltinTableUpdate, Catalog, CatalogState};

/// The results of a builtin item migration.
pub(crate) struct BuiltinItemMigrationResult {
    /// A vec of updates to apply to the builtin tables.
    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
    /// A set of storage collections to drop (only used by legacy migration).
    pub(crate) storage_collections_to_drop: BTreeSet<GlobalId>,
    /// A set of new shards that may need to be initialized (only used by 0dt migration).
    pub(crate) migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
    /// Some cleanup action to take once the migration has been made durable.
    pub(crate) cleanup_action: BoxFuture<'static, ()>,
}

/// Perform migrations for any builtin items that may have changed between versions.
pub(crate) async fn migrate_builtin_items(
    state: &mut CatalogState,
    txn: &mut Transaction<'_>,
    local_expr_cache: &mut LocalExpressionCache,
    migrated_builtins: Vec<CatalogItemId>,
    config: BuiltinItemMigrationConfig,
) -> Result<BuiltinItemMigrationResult, Error> {
    match config {
        BuiltinItemMigrationConfig::Legacy => {
            migrate_builtin_items_legacy(state, txn, migrated_builtins).await
        }
        BuiltinItemMigrationConfig::ZeroDownTime {
            persist_client,
            deploy_generation,
            read_only,
        } => {
            migrate_builtin_items_0dt(
                state,
                txn,
                local_expr_cache,
                persist_client,
                migrated_builtins,
                deploy_generation,
                read_only,
            )
            .await
        }
    }
}

/// The legacy method for builtin migrations is to drop all migrated items and all of their
/// dependents and re-create them all with the new schema and new global IDs.
async fn migrate_builtin_items_legacy(
    state: &mut CatalogState,
    txn: &mut Transaction<'_>,
    migrated_builtins: Vec<CatalogItemId>,
) -> Result<BuiltinItemMigrationResult, Error> {
    let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
        .map(|builtin| {
            let id = state.resolve_builtin_object(builtin);
            let fingerprint = builtin.fingerprint();
            (id, fingerprint)
        })
        .collect();
    let mut builtin_migration_metadata = Catalog::generate_builtin_migration_metadata(
        state,
        txn,
        migrated_builtins,
        id_fingerprint_map,
    )?;
    let builtin_table_updates =
        Catalog::apply_builtin_migration(state, txn, &mut builtin_migration_metadata).await?;

    let cleanup_action = async {}.boxed();
    Ok(BuiltinItemMigrationResult {
        builtin_table_updates,
        storage_collections_to_drop: builtin_migration_metadata.previous_storage_collection_ids,
        migrated_storage_collections_0dt: BTreeSet::new(),
        cleanup_action,
    })
}

/// An implementation of builtin item migrations that is compatible with zero down-time upgrades.
/// The issue with the legacy approach is that it mints new global IDs for each migrated item and
/// its descendents, without durably writing those IDs down in the catalog. As a result, the
/// previous Materialize version, which is still running, may allocate the same global IDs. This
/// would cause confusion for the current version when it's promoted to the leader because its
/// definition of global IDs will no longer be valid. At best, the current version would have to
/// rehydrate all objects that depend on migrated items. At worst, it would panic.
///
/// The high level description of this approach is that we create new shards for each migrated
/// builtin table with the new table schema, without changing the global ID. Dependent objects are
/// not modified but now read from the new shards.
///
/// A detailed description of this approach follows. It's important that all of these steps are
/// idempotent, so that we can safely crash at any point and non-upgrades turn into a no-op.
///
///    1. Each environment has a dedicated persist shard, called the migration shard, that allows
///       environments to durably write down metadata while in read-only mode. The shard is a
///       mapping of `(GlobalId, deploy_generation)` to `ShardId`.
///    2. Collect the `GlobalId` of all migrated tables for the current deploy generation.
///    3. Read in the current contents of the migration shard.
///    4. Collect all the `ShardId`s from the migration shard that are not at the current
///       `deploy_generation` or are not in the set of migrated tables.
///       a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards
///          from an incomplete migration. Finalize them and remove them from the migration shard.
///          Note: care must be taken to not remove the shard from the migration shard until we are
///          sure that they will be finalized, otherwise the shard will leak.
///       b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a
///       complete migration. Remove them from the migration shard.
///    5. Collect all the `GlobalId`s of tables that are migrated, but not in the migration shard
///       for the current deploy generation. Generate new `ShardId`s and add them to the migration
///       shard.
///    6. At this point the migration shard should only logically contain a mapping of migrated
///       table `GlobalId`s to new `ShardId`s for the current deploy generation. For each of these
///       `GlobalId`s such that the `ShardId` isn't already in the storage metadata:
///       a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata.
///       b. Finalize the removed `ShardId`s.
///       c. Insert the new `GlobalId` to `ShardId` mapping into the storage metadata.
///
/// This approach breaks the abstraction boundary between the catalog and the storage metadata, but
/// these types of rare, but extremely useful, abstraction breaks is the exact reason they are
/// co-located.
///
/// Since the new shards are created in read-only mode, they will be left empty and all dependent
/// items will fail to hydrate.
/// TODO(jkosh44) Back-fill these tables in read-only mode so they can properly hydrate.
///
/// While in read-only mode we write the migration changes to `txn`, which will update the
/// in-memory catalog, which will cause the new shards to be created in storage. However, we don't
/// have to worry about the catalog changes becoming durable because the `txn` is in savepoint
/// mode. When we re-execute this migration as the leader (i.e. outside of read-only mode), `txn`
/// will be writable and the migration will be made durable in the catalog. We always write
/// directly to the migration shard, regardless of read-only mode. So we have to be careful not to
/// remove anything from the migration shard until we're sure that its results have been made
/// durable elsewhere.
async fn migrate_builtin_items_0dt(
    state: &mut CatalogState,
    txn: &mut Transaction<'_>,
    local_expr_cache: &mut LocalExpressionCache,
    persist_client: PersistClient,
    migrated_builtins: Vec<CatalogItemId>,
    deploy_generation: u64,
    read_only: bool,
) -> Result<BuiltinItemMigrationResult, Error> {
    assert_eq!(
        read_only,
        txn.is_savepoint(),
        "txn must be in savepoint mode when read_only is true, and in writable mode when read_only is false"
    );

    // 0. Update durably stored fingerprints.
    let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
        .map(|builtin| {
            let id = state.resolve_builtin_object(builtin);
            let fingerprint = builtin.fingerprint();
            (id, fingerprint)
        })
        .collect();
    let mut migrated_system_object_mappings = BTreeMap::new();
    for item_id in &migrated_builtins {
        let fingerprint = id_fingerprint_map
            .get(item_id)
            .expect("missing fingerprint");
        let entry = state.get_entry(item_id);
        let schema_name = state
            .get_schema(
                &entry.name().qualifiers.database_spec,
                &entry.name().qualifiers.schema_spec,
                entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
            )
            .name
            .schema
            .as_str();
        // Builtin Items can only be referenced by a single GlobalId.
        let global_id = state.get_entry(item_id).global_ids().into_element();

        migrated_system_object_mappings.insert(
            *item_id,
            SystemObjectMapping {
                description: SystemObjectDescription {
                    schema_name: schema_name.to_string(),
                    object_type: entry.item_type(),
                    object_name: entry.name().item.clone(),
                },
                unique_identifier: SystemObjectUniqueIdentifier {
                    catalog_id: *item_id,
                    global_id,
                    fingerprint: fingerprint.clone(),
                },
            },
        );
    }
    txn.update_system_object_mappings(migrated_system_object_mappings)?;

    // 1. Open migration shard.
    let organization_id = state.config.environment_id.organization_id();
    let shard_id = txn
        .get_builtin_migration_shard()
        .expect("builtin migration shard should exist for opened catalogs");
    let diagnostics = Diagnostics {
        shard_name: "builtin_migration".to_string(),
        handle_purpose: format!("builtin table migration shard for org {organization_id:?} generation {deploy_generation:?}"),
    };
    let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, Diff, i64> = persist_client
        .open_critical_since(
            shard_id,
            // TODO: We may need to use a different critical reader
            // id for this if we want to be able to introspect it via SQL.
            PersistClient::CONTROLLER_CRITICAL_SINCE,
            diagnostics.clone(),
        )
        .await
        .expect("invalid usage");
    let (mut write_handle, mut read_handle): (
        WriteHandle<TableKey, ShardId, Timestamp, Diff>,
        ReadHandle<TableKey, ShardId, Timestamp, Diff>,
    ) = persist_client
        .open(
            shard_id,
            Arc::new(TableKeySchema),
            Arc::new(ShardIdSchema),
            diagnostics,
            USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
        )
        .await
        .expect("invalid usage");
    // Commit an empty write at the minimum timestamp so the shard is always readable.
    const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, Diff)] = &[];
    let res = write_handle
        .compare_and_append(
            EMPTY_UPDATES,
            Antichain::from_elem(Timestamp::minimum()),
            Antichain::from_elem(Timestamp::minimum().step_forward()),
        )
        .await
        .expect("invalid usage");
    if let Err(e) = res {
        debug!("migration shard already initialized: {e:?}");
    }

    // 2. Get the `GlobalId` of all migrated storage collections.
    let migrated_storage_collections: BTreeSet<_> = migrated_builtins
        .into_iter()
        .filter_map(|item_id| {
            let gid = match state.get_entry(&item_id).item() {
                CatalogItem::Table(table) => {
                    let mut ids: Vec<_> = table.global_ids().collect();
                    assert_eq!(ids.len(), 1, "{ids:?}");
                    ids.pop().expect("checked length")
                }
                CatalogItem::Source(source) => source.global_id(),
                CatalogItem::MaterializedView(mv) => mv.global_id(),
                CatalogItem::ContinualTask(ct) => ct.global_id(),
                CatalogItem::Log(_)
                | CatalogItem::Sink(_)
                | CatalogItem::View(_)
                | CatalogItem::Index(_)
                | CatalogItem::Type(_)
                | CatalogItem::Func(_)
                | CatalogItem::Secret(_)
                | CatalogItem::Connection(_) => return None,
            };
            let GlobalId::System(raw_gid) = gid else {
                unreachable!(
                    "builtin objects must have system ID, found: {item_id:?} with {gid:?}"
                );
            };
            Some(raw_gid)
        })
        .collect();

    // 3. Read in the current contents of the migration shard.
    // We intentionally fetch the upper AFTER opening the read handle to address races between
    // the upper and since moving forward in some other process.
    let upper = fetch_upper(&mut write_handle).await;
    // The empty write above should ensure that the upper is at least 1.
    let as_of = upper.checked_sub(1).ok_or_else(|| {
        Error::new(ErrorKind::Internal(format!(
            "builtin migration failed, unexpected upper: {upper:?}"
        )))
    })?;
    let since = read_handle.since();
    assert!(
        since.less_equal(&as_of),
        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
    );
    let as_of = Antichain::from_elem(as_of);
    let snapshot = read_handle
        .snapshot_and_fetch(as_of)
        .await
        .expect("we have advanced the as_of by the since");
    soft_assert_or_log!(
        snapshot.iter().all(|(_, _, diff)| *diff == 1),
        "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}"
    );
    let mut global_id_shards: BTreeMap<_, _> = snapshot
        .into_iter()
        .map(|((key, value), _ts, _diff)| {
            let table_key = key.expect("persist decoding error");
            let shard_id = value.expect("persist decoding error");
            (table_key, shard_id)
        })
        .collect();

    // 4. Clean up contents of migration shard.
    let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, Diff)> = Vec::new();
    let mut migration_shards_to_finalize = BTreeSet::new();
    let storage_collection_metadata = {
        let txn: &mut dyn StorageTxn<Timestamp> = txn;
        txn.get_collection_metadata()
    };
    for (table_key, shard_id) in global_id_shards.clone() {
        if table_key.deploy_generation > deploy_generation {
            halt!(
                "saw deploy generation {}, which is greater than current deploy generation {}",
                table_key.deploy_generation,
                deploy_generation
            );
        }

        if !migrated_storage_collections.contains(&table_key.global_id)
            || table_key.deploy_generation < deploy_generation
        {
            global_id_shards.remove(&table_key);
            if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
                == Some(&shard_id)
            {
                migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
            } else {
                migration_shards_to_finalize.insert((table_key, shard_id));
            }
        }
    }

    // 5. Add migrated tables to migration shard for current generation.
    let mut global_id_shards: BTreeMap<_, _> = global_id_shards
        .into_iter()
        .map(|(table_key, shard_id)| (table_key.global_id, shard_id))
        .collect();
    for global_id in migrated_storage_collections {
        if !global_id_shards.contains_key(&global_id) {
            let shard_id = ShardId::new();
            global_id_shards.insert(global_id, shard_id);
            let table_key = TableKey {
                global_id,
                deploy_generation,
            };
            migrated_shard_updates.push(((table_key, shard_id), upper, 1));
        }
    }

    // It's very important that we use the same `upper` that was used to read in a snapshot of the
    // shard. If someone updated the shard after we read then this write will fail.
    let upper = if !migrated_shard_updates.is_empty() {
        write_to_migration_shard(
            migrated_shard_updates,
            upper,
            &mut write_handle,
            &mut since_handle,
        )
        .await?
    } else {
        upper
    };

    // 6. Update `GlobalId` to `ShardId` mapping and register old `ShardId`s for finalization. We don't do the finalization here and instead rely on the background finalization task.
    let migrated_storage_collections_0dt = {
        let txn: &mut dyn StorageTxn<Timestamp> = txn;
        let storage_collection_metadata = txn.get_collection_metadata();
        let global_id_shards: BTreeMap<_, _> = global_id_shards
            .into_iter()
            .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id))
            .filter(|(global_id, shard_id)| {
                storage_collection_metadata.get(global_id) != Some(shard_id)
            })
            .collect();
        let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect();
        let mut old_shard_ids: BTreeSet<_> = txn
            .delete_collection_metadata(global_ids.clone())
            .into_iter()
            .map(|(_, shard_id)| shard_id)
            .collect();
        old_shard_ids.extend(
            migration_shards_to_finalize
                .iter()
                .map(|(_, shard_id)| shard_id),
        );
        txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| {
            Error::new(ErrorKind::Internal(format!(
                "builtin migration failed: {e}"
            )))
        })?;
        txn.insert_collection_metadata(global_id_shards)
            .map_err(|e| {
                Error::new(ErrorKind::Internal(format!(
                    "builtin migration failed: {e}"
                )))
            })?;
        global_ids
    };

    // 7. Map the migrated `GlobalId`s to their corresponding `CatalogItemId`.
    let migrated_storage_collections_0dt = migrated_storage_collections_0dt
        .into_iter()
        .map(|gid| state.get_entry_by_global_id(&gid).id())
        .collect();

    let updates = txn.get_and_commit_op_updates();
    let builtin_table_updates = state
        .apply_updates_for_bootstrap(updates, local_expr_cache)
        .await;

    let cleanup_action = async move {
        if !read_only {
            let updates: Vec<_> = migration_shards_to_finalize
                .into_iter()
                .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
                .collect();
            if !updates.is_empty() {
                // Ignore any errors, these shards will get cleaned up in the next upgrade.
                // It's important to use `upper` here. If there was another concurrent write at
                // `upper`, then `updates` are no longer valid.
                let res =
                    write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle)
                        .await;
                if let Err(e) = res {
                    error!("Unable to remove old entries from migration shard: {e:?}");
                }
            }
        }
    }
    .boxed();

    Ok(BuiltinItemMigrationResult {
        builtin_table_updates,
        storage_collections_to_drop: BTreeSet::new(),
        migrated_storage_collections_0dt,
        cleanup_action,
    })
}

async fn fetch_upper(
    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, Diff>,
) -> Timestamp {
    write_handle
        .fetch_recent_upper()
        .await
        .as_option()
        .cloned()
        .expect("we use a totally ordered time and never finalize the shard")
}

async fn write_to_migration_shard(
    updates: Vec<((TableKey, ShardId), Timestamp, Diff)>,
    upper: Timestamp,
    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, Diff>,
    since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, Diff, i64>,
) -> Result<Timestamp, Error> {
    let next_upper = upper.step_forward();
    // Lag the shard's upper by 1 to keep it readable.
    let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
    let next_upper_antichain = Antichain::from_elem(next_upper);

    if let Err(err) = write_handle
        .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain)
        .await
        .expect("invalid usage")
    {
        return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence(
            FenceError::migration(err),
        ))));
    }

    // The since handle gives us the ability to fence out other downgraders using an opaque token.
    // (See the method documentation for details.)
    // That's not needed here, so we use the since handle's opaque token to avoid any comparison
    // failures.
    let opaque = *since_handle.opaque();
    let downgrade = since_handle
        .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
        .await;
    match downgrade {
        None => {}
        Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
        Some(Ok(updated)) => soft_assert_or_log!(
            updated == downgrade_to,
            "updated bound ({updated:?}) should match expected ({downgrade_to:?})"
        ),
    }

    Ok(next_upper)
}

mod persist_schema {
    use std::num::ParseIntError;

    use arrow::array::{StringArray, StringBuilder};
    use bytes::{BufMut, Bytes};
    use mz_persist_types::codec_impls::{
        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
    };
    use mz_persist_types::columnar::Schema2;
    use mz_persist_types::stats::NoneStats;
    use mz_persist_types::Codec;

    #[derive(Debug, Clone, Default, Eq, Ord, PartialEq, PartialOrd)]
    pub(super) struct TableKey {
        pub(super) global_id: u64,
        pub(super) deploy_generation: u64,
    }

    impl std::fmt::Display for TableKey {
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            write!(f, "{}-{}", self.global_id, self.deploy_generation)
        }
    }

    impl std::str::FromStr for TableKey {
        type Err = String;

        fn from_str(s: &str) -> Result<Self, Self::Err> {
            let parts: Vec<_> = s.split('-').collect();
            let &[global_id, deploy_generation] = parts.as_slice() else {
                return Err(format!("invalid TableKey '{s}'"));
            };
            let global_id = global_id
                .parse()
                .map_err(|e: ParseIntError| e.to_string())?;
            let deploy_generation = deploy_generation
                .parse()
                .map_err(|e: ParseIntError| e.to_string())?;
            Ok(TableKey {
                global_id,
                deploy_generation,
            })
        }
    }

    impl From<TableKey> for String {
        fn from(table_key: TableKey) -> Self {
            table_key.to_string()
        }
    }

    impl TryFrom<String> for TableKey {
        type Error = String;

        fn try_from(s: String) -> Result<Self, Self::Error> {
            s.parse()
        }
    }

    impl Codec for TableKey {
        type Storage = ();
        type Schema = TableKeySchema;
        fn codec_name() -> String {
            "TableKey".into()
        }
        fn encode<B: BufMut>(&self, buf: &mut B) {
            buf.put(self.to_string().as_bytes())
        }
        fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result<Self, String> {
            let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
            table_key.parse()
        }
        fn encode_schema(_schema: &Self::Schema) -> Bytes {
            Bytes::new()
        }
        fn decode_schema(buf: &Bytes) -> Self::Schema {
            assert_eq!(*buf, Bytes::new());
            TableKeySchema
        }
    }

    impl SimpleColumnarData for TableKey {
        type ArrowBuilder = StringBuilder;
        type ArrowColumn = StringArray;

        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
            builder.values_slice().len()
        }

        fn push(&self, builder: &mut Self::ArrowBuilder) {
            builder.append_value(&self.to_string());
        }
        fn push_null(builder: &mut Self::ArrowBuilder) {
            builder.append_null();
        }
        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
            *self = column.value(idx).parse().expect("should be valid TableKey");
        }
    }

    /// An implementation of [Schema2] for [TableKey].
    #[derive(Debug, PartialEq)]
    pub(super) struct TableKeySchema;

    impl Schema2<TableKey> for TableKeySchema {
        type ArrowColumn = StringArray;
        type Statistics = NoneStats;

        type Decoder = SimpleColumnarDecoder<TableKey>;
        type Encoder = SimpleColumnarEncoder<TableKey>;

        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
            Ok(SimpleColumnarEncoder::default())
        }

        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
            Ok(SimpleColumnarDecoder::new(col))
        }
    }
}