mz_adapter/catalog/open/
builtin_item_migration.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Migrations for builtin items.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use mz_catalog::SYSTEM_CONN_ID;
18use mz_catalog::builtin::{BUILTINS, BuiltinTable, Fingerprint};
19use mz_catalog::config::BuiltinItemMigrationConfig;
20use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
21use mz_catalog::durable::{
22    DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
23};
24use mz_catalog::memory::error::{Error, ErrorKind};
25use mz_catalog::memory::objects::CatalogItem;
26use mz_ore::collections::CollectionExt;
27use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
28use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
29use mz_persist_client::critical::SinceHandle;
30use mz_persist_client::read::ReadHandle;
31use mz_persist_client::write::WriteHandle;
32use mz_persist_client::{Diagnostics, PersistClient};
33use mz_persist_types::ShardId;
34use mz_persist_types::codec_impls::ShardIdSchema;
35use mz_repr::{CatalogItemId, GlobalId, Timestamp};
36use mz_sql::catalog::CatalogItem as _;
37use mz_storage_client::controller::StorageTxn;
38use mz_storage_types::StorageDiff;
39use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
40use tracing::{debug, error};
41
42use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
43use crate::catalog::state::LocalExpressionCache;
44use crate::catalog::{BuiltinTableUpdate, CatalogState};
45
46/// The results of a builtin item migration.
47pub(crate) struct BuiltinItemMigrationResult {
48    /// A vec of updates to apply to the builtin tables.
49    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
50    /// A set of new shards that may need to be initialized.
51    pub(crate) migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
52    /// Some cleanup action to take once the migration has been made durable.
53    pub(crate) cleanup_action: BoxFuture<'static, ()>,
54}
55
56/// Perform migrations for any builtin items that may have changed between versions.
57pub(crate) async fn migrate_builtin_items(
58    state: &mut CatalogState,
59    txn: &mut Transaction<'_>,
60    local_expr_cache: &mut LocalExpressionCache,
61    migrated_builtins: Vec<CatalogItemId>,
62    BuiltinItemMigrationConfig {
63        persist_client,
64        read_only,
65    }: BuiltinItemMigrationConfig,
66) -> Result<BuiltinItemMigrationResult, Error> {
67    migrate_builtin_items_0dt(
68        state,
69        txn,
70        local_expr_cache,
71        persist_client,
72        migrated_builtins,
73        read_only,
74    )
75    .await
76}
77
78/// An implementation of builtin item migrations that is compatible with zero down-time upgrades.
79/// The issue with the legacy approach is that it mints new global IDs for each migrated item and
80/// its descendents, without durably writing those IDs down in the catalog. As a result, the
81/// previous Materialize version, which is still running, may allocate the same global IDs. This
82/// would cause confusion for the current version when it's promoted to the leader because its
83/// definition of global IDs will no longer be valid. At best, the current version would have to
84/// rehydrate all objects that depend on migrated items. At worst, it would panic.
85///
86/// The high level description of this approach is that we create new shards for each migrated
87/// builtin table with the new table schema, without changing the global ID. Dependent objects are
88/// not modified but now read from the new shards.
89///
90/// A detailed description of this approach follows. It's important that all of these steps are
91/// idempotent, so that we can safely crash at any point and non-upgrades turn into a no-op.
92///
93///    1. Each environment has a dedicated persist shard, called the migration shard, that allows
94///       environments to durably write down metadata while in read-only mode. The shard is a
95///       mapping of `(GlobalId, build_version)` to `ShardId`.
96///    2. Collect the `GlobalId` of all migrated tables for the current build version.
97///    3. Read in the current contents of the migration shard.
98///    4. Collect all the `ShardId`s from the migration shard that are not at the current
99///       `build_version` or are not in the set of migrated tables.
100///       a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards
101///          from an incomplete migration. Finalize them and remove them from the migration shard.
102///          Note: care must be taken to not remove the shard from the migration shard until we are
103///          sure that they will be finalized, otherwise the shard will leak.
104///       b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a
105///       complete migration. Remove them from the migration shard.
106///    5. Collect all the `GlobalId`s of tables that are migrated, but not in the migration shard
107///       for the current build version. Generate new `ShardId`s and add them to the migration
108///       shard.
109///    6. At this point the migration shard should only logically contain a mapping of migrated
110///       table `GlobalId`s to new `ShardId`s for the current build version. For each of these
111///       `GlobalId`s such that the `ShardId` isn't already in the storage metadata:
112///       a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata.
113///       b. Finalize the removed `ShardId`s.
114///       c. Insert the new `GlobalId` to `ShardId` mapping into the storage metadata.
115///
116/// This approach breaks the abstraction boundary between the catalog and the storage metadata, but
117/// these types of rare, but extremely useful, abstraction breaks is the exact reason they are
118/// co-located.
119///
120/// Since the new shards are created in read-only mode, they will be left empty and all dependent
121/// items will fail to hydrate.
122/// TODO(jkosh44) Back-fill these tables in read-only mode so they can properly hydrate.
123///
124/// While in read-only mode we write the migration changes to `txn`, which will update the
125/// in-memory catalog, which will cause the new shards to be created in storage. However, we don't
126/// have to worry about the catalog changes becoming durable because the `txn` is in savepoint
127/// mode. When we re-execute this migration as the leader (i.e. outside of read-only mode), `txn`
128/// will be writable and the migration will be made durable in the catalog. We always write
129/// directly to the migration shard, regardless of read-only mode. So we have to be careful not to
130/// remove anything from the migration shard until we're sure that its results have been made
131/// durable elsewhere.
132async fn migrate_builtin_items_0dt(
133    state: &mut CatalogState,
134    txn: &mut Transaction<'_>,
135    local_expr_cache: &mut LocalExpressionCache,
136    persist_client: PersistClient,
137    migrated_builtins: Vec<CatalogItemId>,
138    read_only: bool,
139) -> Result<BuiltinItemMigrationResult, Error> {
140    assert_eq!(
141        read_only,
142        txn.is_savepoint(),
143        "txn must be in savepoint mode when read_only is true, and in writable mode when read_only is false"
144    );
145
146    let build_version = state.config.build_info.semver_version();
147
148    // 0. Update durably stored fingerprints.
149    let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
150        .map(|builtin| {
151            let id = state.resolve_builtin_object(builtin);
152            let fingerprint = builtin.fingerprint();
153            (id, fingerprint)
154        })
155        .collect();
156    let mut migrated_system_object_mappings = BTreeMap::new();
157    for item_id in &migrated_builtins {
158        let fingerprint = id_fingerprint_map
159            .get(item_id)
160            .expect("missing fingerprint");
161        let entry = state.get_entry(item_id);
162        let schema_name = state
163            .get_schema(
164                &entry.name().qualifiers.database_spec,
165                &entry.name().qualifiers.schema_spec,
166                entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
167            )
168            .name
169            .schema
170            .as_str();
171        // Builtin Items can only be referenced by a single GlobalId.
172        let global_id = state.get_entry(item_id).global_ids().into_element();
173
174        migrated_system_object_mappings.insert(
175            *item_id,
176            SystemObjectMapping {
177                description: SystemObjectDescription {
178                    schema_name: schema_name.to_string(),
179                    object_type: entry.item_type(),
180                    object_name: entry.name().item.clone(),
181                },
182                unique_identifier: SystemObjectUniqueIdentifier {
183                    catalog_id: *item_id,
184                    global_id,
185                    fingerprint: fingerprint.clone(),
186                },
187            },
188        );
189    }
190    txn.update_system_object_mappings(migrated_system_object_mappings)?;
191
192    // 1. Open migration shard.
193    let organization_id = state.config.environment_id.organization_id();
194    let shard_id = txn
195        .get_builtin_migration_shard()
196        .expect("builtin migration shard should exist for opened catalogs");
197    let diagnostics = Diagnostics {
198        shard_name: "builtin_migration".to_string(),
199        handle_purpose: format!(
200            "builtin table migration shard for org {organization_id:?} version {build_version:?}"
201        ),
202    };
203    let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
204        persist_client
205            .open_critical_since(
206                shard_id,
207                // TODO: We may need to use a different critical reader
208                // id for this if we want to be able to introspect it via SQL.
209                PersistClient::CONTROLLER_CRITICAL_SINCE,
210                diagnostics.clone(),
211            )
212            .await
213            .expect("invalid usage");
214    let (mut write_handle, mut read_handle): (
215        WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
216        ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
217    ) = persist_client
218        .open(
219            shard_id,
220            Arc::new(TableKeySchema),
221            Arc::new(ShardIdSchema),
222            diagnostics,
223            USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
224        )
225        .await
226        .expect("invalid usage");
227    // Commit an empty write at the minimum timestamp so the shard is always readable.
228    const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
229    let res = write_handle
230        .compare_and_append(
231            EMPTY_UPDATES,
232            Antichain::from_elem(Timestamp::minimum()),
233            Antichain::from_elem(Timestamp::minimum().step_forward()),
234        )
235        .await
236        .expect("invalid usage");
237    if let Err(e) = res {
238        debug!("migration shard already initialized: {e:?}");
239    }
240
241    // 2. Get the `GlobalId` of all migrated storage collections.
242    let migrated_storage_collections: BTreeSet<_> = migrated_builtins
243        .into_iter()
244        .filter_map(|item_id| {
245            let gid = match state.get_entry(&item_id).item() {
246                CatalogItem::Table(table) => {
247                    let mut ids: Vec<_> = table.global_ids().collect();
248                    assert_eq!(ids.len(), 1, "{ids:?}");
249                    ids.pop().expect("checked length")
250                }
251                CatalogItem::Source(source) => source.global_id(),
252                CatalogItem::MaterializedView(mv) => mv.global_id(),
253                CatalogItem::ContinualTask(ct) => ct.global_id(),
254                CatalogItem::Log(_)
255                | CatalogItem::Sink(_)
256                | CatalogItem::View(_)
257                | CatalogItem::Index(_)
258                | CatalogItem::Type(_)
259                | CatalogItem::Func(_)
260                | CatalogItem::Secret(_)
261                | CatalogItem::Connection(_) => return None,
262            };
263            let GlobalId::System(raw_gid) = gid else {
264                unreachable!(
265                    "builtin objects must have system ID, found: {item_id:?} with {gid:?}"
266                );
267            };
268            Some(raw_gid)
269        })
270        .collect();
271
272    // 3. Read in the current contents of the migration shard.
273    // We intentionally fetch the upper AFTER opening the read handle to address races between
274    // the upper and since moving forward in some other process.
275    let upper = fetch_upper(&mut write_handle).await;
276    // The empty write above should ensure that the upper is at least 1.
277    let as_of = upper.checked_sub(1).ok_or_else(|| {
278        Error::new(ErrorKind::Internal(format!(
279            "builtin migration failed, unexpected upper: {upper:?}"
280        )))
281    })?;
282    let since = read_handle.since();
283    assert!(
284        since.less_equal(&as_of),
285        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
286    );
287    let as_of = Antichain::from_elem(as_of);
288    let snapshot = read_handle
289        .snapshot_and_fetch(as_of)
290        .await
291        .expect("we have advanced the as_of by the since");
292    soft_assert_or_log!(
293        snapshot.iter().all(|(_, _, diff)| *diff == 1),
294        "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}"
295    );
296    let mut global_id_shards: BTreeMap<_, _> = snapshot
297        .into_iter()
298        .map(|((key, value), _ts, _diff)| {
299            let table_key = key.expect("persist decoding error");
300            let shard_id = value.expect("persist decoding error");
301            (table_key, shard_id)
302        })
303        .collect();
304
305    // 4. Clean up contents of migration shard.
306    let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new();
307    let mut migration_shards_to_finalize = BTreeSet::new();
308    let storage_collection_metadata = {
309        let txn: &mut dyn StorageTxn<Timestamp> = txn;
310        txn.get_collection_metadata()
311    };
312    for (table_key, shard_id) in global_id_shards.clone() {
313        if table_key.build_version > build_version {
314            halt!(
315                "saw build version {}, which is greater than current build version {}",
316                table_key.build_version,
317                build_version
318            );
319        }
320
321        if !migrated_storage_collections.contains(&table_key.global_id)
322            || table_key.build_version < build_version
323        {
324            global_id_shards.remove(&table_key);
325            if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
326                == Some(&shard_id)
327            {
328                migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
329            } else {
330                migration_shards_to_finalize.insert((table_key, shard_id));
331            }
332        }
333    }
334
335    // 5. Add migrated tables to migration shard for current build version.
336    let mut global_id_shards: BTreeMap<_, _> = global_id_shards
337        .into_iter()
338        .map(|(table_key, shard_id)| (table_key.global_id, shard_id))
339        .collect();
340    for global_id in migrated_storage_collections {
341        if !global_id_shards.contains_key(&global_id) {
342            let shard_id = ShardId::new();
343            global_id_shards.insert(global_id, shard_id);
344            let table_key = TableKey {
345                global_id,
346                build_version: build_version.clone(),
347            };
348            migrated_shard_updates.push(((table_key, shard_id), upper, 1));
349        }
350    }
351
352    // It's very important that we use the same `upper` that was used to read in a snapshot of the
353    // shard. If someone updated the shard after we read then this write will fail.
354    let upper = if !migrated_shard_updates.is_empty() {
355        write_to_migration_shard(
356            migrated_shard_updates,
357            upper,
358            &mut write_handle,
359            &mut since_handle,
360        )
361        .await?
362    } else {
363        upper
364    };
365
366    // 6. Update `GlobalId` to `ShardId` mapping and register old `ShardId`s for finalization. We don't do the finalization here and instead rely on the background finalization task.
367    let migrated_storage_collections_0dt = {
368        let txn: &mut dyn StorageTxn<Timestamp> = txn;
369        let storage_collection_metadata = txn.get_collection_metadata();
370        let global_id_shards: BTreeMap<_, _> = global_id_shards
371            .into_iter()
372            .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id))
373            .filter(|(global_id, shard_id)| {
374                storage_collection_metadata.get(global_id) != Some(shard_id)
375            })
376            .collect();
377        let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect();
378        let mut old_shard_ids: BTreeSet<_> = txn
379            .delete_collection_metadata(global_ids.clone())
380            .into_iter()
381            .map(|(_, shard_id)| shard_id)
382            .collect();
383        old_shard_ids.extend(
384            migration_shards_to_finalize
385                .iter()
386                .map(|(_, shard_id)| shard_id),
387        );
388        txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| {
389            Error::new(ErrorKind::Internal(format!(
390                "builtin migration failed: {e}"
391            )))
392        })?;
393        txn.insert_collection_metadata(global_id_shards)
394            .map_err(|e| {
395                Error::new(ErrorKind::Internal(format!(
396                    "builtin migration failed: {e}"
397                )))
398            })?;
399        global_ids
400    };
401
402    // 7. Map the migrated `GlobalId`s to their corresponding `CatalogItemId`.
403    let migrated_storage_collections_0dt = migrated_storage_collections_0dt
404        .into_iter()
405        .map(|gid| state.get_entry_by_global_id(&gid).id())
406        .collect();
407
408    let updates = txn.get_and_commit_op_updates();
409    let builtin_table_updates = state
410        .apply_updates_for_bootstrap(updates, local_expr_cache)
411        .await;
412
413    let cleanup_action = async move {
414        if !read_only {
415            let updates: Vec<_> = migration_shards_to_finalize
416                .into_iter()
417                .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
418                .collect();
419            if !updates.is_empty() {
420                // Ignore any errors, these shards will get cleaned up in the next upgrade.
421                // It's important to use `upper` here. If there was another concurrent write at
422                // `upper`, then `updates` are no longer valid.
423                let res =
424                    write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle)
425                        .await;
426                if let Err(e) = res {
427                    error!("Unable to remove old entries from migration shard: {e:?}");
428                }
429            }
430        }
431    }
432    .boxed();
433
434    Ok(BuiltinItemMigrationResult {
435        builtin_table_updates,
436        migrated_storage_collections_0dt,
437        cleanup_action,
438    })
439}
440
441async fn fetch_upper(
442    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
443) -> Timestamp {
444    write_handle
445        .fetch_recent_upper()
446        .await
447        .as_option()
448        .cloned()
449        .expect("we use a totally ordered time and never finalize the shard")
450}
451
452async fn write_to_migration_shard(
453    updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)>,
454    upper: Timestamp,
455    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
456    since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
457) -> Result<Timestamp, Error> {
458    let next_upper = upper.step_forward();
459    // Lag the shard's upper by 1 to keep it readable.
460    let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
461    let next_upper_antichain = Antichain::from_elem(next_upper);
462
463    if let Err(err) = write_handle
464        .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain)
465        .await
466        .expect("invalid usage")
467    {
468        return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence(
469            FenceError::migration(err),
470        ))));
471    }
472
473    // The since handle gives us the ability to fence out other downgraders using an opaque token.
474    // (See the method documentation for details.)
475    // That's not needed here, so we use the since handle's opaque token to avoid any comparison
476    // failures.
477    let opaque = *since_handle.opaque();
478    let downgrade = since_handle
479        .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
480        .await;
481    match downgrade {
482        None => {}
483        Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
484        Some(Ok(updated)) => soft_assert_or_log!(
485            updated == downgrade_to,
486            "updated bound ({updated:?}) should match expected ({downgrade_to:?})"
487        ),
488    }
489
490    Ok(next_upper)
491}
492
493mod persist_schema {
494    use std::num::ParseIntError;
495
496    use arrow::array::{StringArray, StringBuilder};
497    use bytes::{BufMut, Bytes};
498    use mz_persist_types::Codec;
499    use mz_persist_types::codec_impls::{
500        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
501    };
502    use mz_persist_types::columnar::Schema;
503    use mz_persist_types::stats::NoneStats;
504
505    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
506    pub(super) struct TableKey {
507        pub(super) global_id: u64,
508        pub(super) build_version: semver::Version,
509    }
510
511    impl std::fmt::Display for TableKey {
512        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513            write!(f, "{}-{}", self.global_id, self.build_version)
514        }
515    }
516
517    impl std::str::FromStr for TableKey {
518        type Err = String;
519
520        fn from_str(s: &str) -> Result<Self, Self::Err> {
521            let parts: Vec<_> = s.splitn(2, '-').collect();
522            let &[global_id, build_version] = parts.as_slice() else {
523                return Err(format!("invalid TableKey '{s}'"));
524            };
525            let global_id = global_id
526                .parse()
527                .map_err(|e: ParseIntError| e.to_string())?;
528            let build_version = build_version
529                .parse()
530                .map_err(|e: semver::Error| e.to_string())?;
531            Ok(TableKey {
532                global_id,
533                build_version,
534            })
535        }
536    }
537
538    impl From<TableKey> for String {
539        fn from(table_key: TableKey) -> Self {
540            table_key.to_string()
541        }
542    }
543
544    impl TryFrom<String> for TableKey {
545        type Error = String;
546
547        fn try_from(s: String) -> Result<Self, Self::Error> {
548            s.parse()
549        }
550    }
551
552    impl Default for TableKey {
553        fn default() -> Self {
554            Self {
555                global_id: Default::default(),
556                build_version: semver::Version::new(0, 0, 0),
557            }
558        }
559    }
560
561    impl Codec for TableKey {
562        type Storage = ();
563        type Schema = TableKeySchema;
564        fn codec_name() -> String {
565            "TableKey".into()
566        }
567        fn encode<B: BufMut>(&self, buf: &mut B) {
568            buf.put(self.to_string().as_bytes())
569        }
570        fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result<Self, String> {
571            let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
572            table_key.parse()
573        }
574        fn encode_schema(_schema: &Self::Schema) -> Bytes {
575            Bytes::new()
576        }
577        fn decode_schema(buf: &Bytes) -> Self::Schema {
578            assert_eq!(*buf, Bytes::new());
579            TableKeySchema
580        }
581    }
582
583    impl SimpleColumnarData for TableKey {
584        type ArrowBuilder = StringBuilder;
585        type ArrowColumn = StringArray;
586
587        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
588            builder.values_slice().len()
589        }
590
591        fn push(&self, builder: &mut Self::ArrowBuilder) {
592            builder.append_value(&self.to_string());
593        }
594        fn push_null(builder: &mut Self::ArrowBuilder) {
595            builder.append_null();
596        }
597        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
598            *self = column.value(idx).parse().expect("should be valid TableKey");
599        }
600    }
601
602    /// An implementation of [Schema] for [TableKey].
603    #[derive(Debug, PartialEq)]
604    pub(super) struct TableKeySchema;
605
606    impl Schema<TableKey> for TableKeySchema {
607        type ArrowColumn = StringArray;
608        type Statistics = NoneStats;
609
610        type Decoder = SimpleColumnarDecoder<TableKey>;
611        type Encoder = SimpleColumnarEncoder<TableKey>;
612
613        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
614            Ok(SimpleColumnarEncoder::default())
615        }
616
617        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
618            Ok(SimpleColumnarDecoder::new(col))
619        }
620    }
621}