1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};

use differential_dataflow::lattice::Lattice;

use timely::order::TotalOrder;
use timely::progress::Timestamp;

use mz_ore::now::EpochMillis;
use mz_persist_types::Codec64;
use mz_proto::RustType;
use mz_repr::{GlobalId, TimestampManipulation};

use crate::client::{StorageCommand, StorageResponse};
use crate::controller::{ProtoStorageCommand, ProtoStorageResponse};
use crate::types::sources::IngestionDescription;

use super::{
    CollectionDescription, Controller, DataSource, DurableCollectionMetadata, StorageController,
};

impl<T> Controller<T>
where
    T: Timestamp + Lattice + TotalOrder + Codec64 + From<EpochMillis> + TimestampManipulation,
    StorageCommand<T>: RustType<ProtoStorageCommand>,
    StorageResponse<T>: RustType<ProtoStorageResponse>,

    Self: StorageController<Timestamp = T>,
{
    /// Determine the delta between `durable_metadata` and `collections` such
    /// that:
    /// - Each remap collection's data shard is its parent collection's remap
    ///   shard.
    /// - No collection contains a `Some` value for its remap shard.
    ///
    /// Apply this delta using
    /// `Controller::upsert_collection_metadata(durable_metadata, <this
    /// function's return value>)`.
    ///
    /// This approach is safe/backward compatible because:
    /// - Every ingestion collection previously had a remap shard
    /// - Every ingestion collection now has a progress collection subsource,
    ///   whose data shard should be the remap shard
    /// - No other type of collection used their remap shards, so dropping them
    ///   entirely is essentially a nop.
    ///
    /// MIGRATION: v0.44
    pub(super) fn remap_shard_migration(
        &mut self,
        durable_metadata: &BTreeMap<GlobalId, DurableCollectionMetadata>,
        collections: &[(GlobalId, CollectionDescription<T>)],
    ) -> BTreeMap<GlobalId, DurableCollectionMetadata> {
        let mut state_to_update = BTreeMap::new();

        let mut progress_collections_pending = BTreeSet::new();

        for (id, desc) in collections {
            // Track that we might be adding metadata for progress collections.
            if matches!(desc.data_source, DataSource::Progress) {
                progress_collections_pending.insert(*id);
            }

            let mut current_metadata = match durable_metadata.get(id) {
                Some(c) => {
                    // If we see any existing collection with a retired remap
                    // shard, we know the migration has already completed.
                    if c.remap_shard.is_none() {
                        return BTreeMap::new();
                    }

                    c.clone()
                }
                // If the item has not yet been added to `durable_metadata`,
                // skip it. If this is a progress collection, we will add it to
                // `durable_metadata` once we see its data collection.
                None => {
                    continue;
                }
            };

            if let DataSource::Ingestion(IngestionDescription {
                remap_collection_id,
                ..
            }) = &desc.data_source
            {
                let current_remap_shard = current_metadata
                    .remap_shard
                    .expect("must have remap shard and must not have been migrated yet");

                // Insert metadata for progress collection.
                state_to_update.insert(
                    *remap_collection_id,
                    DurableCollectionMetadata {
                        remap_shard: None,
                        data_shard: current_remap_shard,
                    },
                );
            };

            // Fixup this collection such that it no longer contains reference
            // to a remap shard; if we needed it for the progress collection,
            // we've already used it by this point.
            current_metadata.remap_shard = None;
            state_to_update.insert(*id, current_metadata);
        }

        assert!(
            progress_collections_pending
                .into_iter()
                .all(|id| state_to_update.contains_key(&id)),
            "all pending progress collections received their parent collection's prev remap \
            shards as their data shard"
        );

        state_to_update
    }
}