mz_ore/
id_gen.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! ID generation utilities.
17
18use hibitset::BitSet;
19use std::borrow::Borrow;
20use std::fmt;
21use std::hash::Hash;
22use std::marker::PhantomData;
23use std::ops::{AddAssign, Sub};
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::{Arc, Mutex};
26use uuid::Uuid;
27
28use rand::rngs::StdRng;
29use rand::{Rng, SeedableRng};
30
31use crate::cast::CastFrom;
32
33/// Manages the allocation of unique IDs.
34#[derive(Debug, Clone)]
35pub struct Gen<Id> {
36    id: u64,
37    phantom: PhantomData<Id>,
38}
39
40impl<Id> Default for Gen<Id> {
41    fn default() -> Self {
42        Self {
43            id: 0,
44            phantom: PhantomData,
45        }
46    }
47}
48
49impl<Id: From<u64>> Gen<Id> {
50    /// Allocates a new identifier of type `Id` and advances the generator.
51    pub fn allocate_id(&mut self) -> Id {
52        let id = self.id;
53        self.id += 1;
54        id.into()
55    }
56}
57
58/// A generator of u64-bit IDs.
59pub type IdGen = Gen<u64>;
60
61/// Manages the allocation of unique IDs.
62///
63/// Atomic version of `Gen`, for sharing between threads.
64#[derive(Debug)]
65pub struct AtomicGen<Id> {
66    id: AtomicU64,
67    phantom: PhantomData<Id>,
68}
69
70impl<Id> Default for AtomicGen<Id> {
71    fn default() -> Self {
72        Self {
73            id: AtomicU64::new(0),
74            phantom: PhantomData,
75        }
76    }
77}
78
79impl<Id: From<u64> + Default> AtomicGen<Id> {
80    /// Allocates a new identifier of type `Id` and advances the generator.
81    pub fn allocate_id(&self) -> Id {
82        // The only purpose of the atomic here is to ensure every caller receives a distinct ID,
83        // there are no requirements on the order in which IDs are produced and there is no other
84        // state protected by this atomic. `Relaxed` ordering is therefore sufficient.
85        let id = self.id.fetch_add(1, Ordering::Relaxed);
86        id.into()
87    }
88}
89
90/// A generator of u64-bit IDs.
91///
92/// Atomic version of `IdGen`, for sharing between threads.
93pub type AtomicIdGen = AtomicGen<u64>;
94
95/// IdAllocator common traits.
96pub trait IdGenerator:
97    From<u8> + AddAssign + Sub + PartialOrd + Copy + Eq + Hash + Ord + serde::Serialize + fmt::Display
98{
99}
100
101impl<T> IdGenerator for T where
102    T: From<u8>
103        + AddAssign
104        + Sub
105        + PartialOrd
106        + Copy
107        + Eq
108        + Hash
109        + Ord
110        + serde::Serialize
111        + fmt::Display
112{
113}
114
115/// Manages allocation of numeric IDs.
116#[derive(Debug, Clone)]
117pub struct IdAllocator<A: IdAllocatorInner>(pub Arc<Mutex<A>>);
118
119/// Common trait for id allocators.
120pub trait IdAllocatorInner: std::fmt::Debug + Send {
121    /// Name of the allocator.
122    const NAME: &'static str;
123    /// Construct an allocator with the given range. Returned ids will be OR'd with `mask`. `mask`
124    /// must not have any bits that could be set by a number <= `max`.
125    fn new(min: u32, max: u32, mask: u32) -> Self;
126    /// Allocate a new id.
127    fn alloc(&mut self) -> Option<u32>;
128    /// Deallocate a used id, making it available for reuse.
129    fn remove(&mut self, id: u32);
130}
131
132/// IdAllocator using a HiBitSet.
133#[derive(Debug, Clone)]
134pub struct IdAllocatorInnerBitSet {
135    next: StdRng,
136    min: u32,
137    max: u32,
138    mask: u32,
139    used: BitSet,
140}
141
142impl IdAllocatorInner for IdAllocatorInnerBitSet {
143    const NAME: &'static str = "hibitset";
144
145    fn new(min: u32, max: u32, mask: u32) -> Self {
146        let total = usize::cast_from(max - min);
147        assert!(total < BitSet::BITS_PER_USIZE.pow(4));
148        IdAllocatorInnerBitSet {
149            next: StdRng::from_entropy(),
150            min,
151            max,
152            mask,
153            used: BitSet::new(),
154        }
155    }
156
157    fn alloc(&mut self) -> Option<u32> {
158        let range = self.min..=self.max;
159        let init = self.next.gen_range(range);
160        let mut next = init;
161        loop {
162            // Because hibitset has a hard maximum of 64**4 (~16 million), subtract the min in case
163            // max is above that. This is safe because we already asserted above that `max - min <
164            // 64**4`.
165            let stored = next - self.min;
166            if !self.used.add(stored) {
167                assert!(
168                    next & self.mask == 0,
169                    "chosen ID must not intersect with mask:\n{:#034b}\n{:#034b}",
170                    next,
171                    self.mask
172                );
173                return Some(next | self.mask);
174            }
175            // Existing value, increment and try again. Wrap once we hit max back to min.
176            next = if next == self.max { self.min } else { next + 1 };
177            // We fully wrapped around. BitSet doesn't have a rank or count method, so we can't
178            // compute this early.
179            if next == init {
180                return None;
181            }
182        }
183    }
184
185    fn remove(&mut self, id: u32) {
186        let id = (!self.mask) & id;
187        let stored = id - self.min;
188        self.used.remove(stored);
189    }
190}
191
192impl<A: IdAllocatorInner> IdAllocator<A> {
193    /// Creates a new `IdAllocator` that will assign IDs between `min` and
194    /// `max`, both inclusive.
195    pub fn new(min: u32, max: u32, mask: u32) -> IdAllocator<A> {
196        assert!(min <= max);
197        if mask != 0 && max > 0 {
198            // mask_check is all 1s in any bit set by all numbers >= max. Assert that the mask
199            // doesn't share any bits with those.
200            let mask_check = (1 << (max.ilog2() + 1)) - 1;
201            assert_eq!(mask & mask_check, 0, "max and mask share bits");
202        }
203        let inner = A::new(min, max, mask);
204        IdAllocator(Arc::new(Mutex::new(inner)))
205    }
206
207    /// Allocates a new ID randomly distributed between min and max.
208    ///
209    /// Returns `None` if the allocator is exhausted.
210    ///
211    /// The ID associated with the [`IdHandle`] will be freed when all of the
212    /// outstanding [`IdHandle`]s have been dropped.
213    pub fn alloc(&self) -> Option<IdHandle<u32, A>> {
214        let inner = Arc::new(internal::IdHandleInner::new(self)?);
215        Some(IdHandle::Dynamic(inner))
216    }
217
218    // Attempt to allocate a new ID. We want the ID to be randomly distributed in the range. To do
219    // this, choose a random candidate. Check if it's already in the used set, and increment in a
220    // loop until an unused slot is found. Ideally we could ask used set what its next used or
221    // unused id is after some given X, but that's not part of the API. This means that when the
222    // range is large and the used set is near full, we will spend a lot of cycles looking for an
223    // open slot. However, we limit the number of connections to the thousands, and connection IDs
224    // have 20 bits (~1 million) of space, so it is not currently possible to enter that state.
225    fn alloc_internal(&self) -> Option<u32> {
226        let mut inner = self.0.lock().expect("lock poisoned");
227        inner.alloc()
228    }
229
230    fn free_internal(&self, id: u32) {
231        let mut inner = self.0.lock().expect("lock poisoned");
232        inner.remove(id);
233    }
234}
235
236/// A clone-able owned reference to an ID.
237///
238/// Once all of the [`IdHandle`]s referencing an ID have been dropped, we will then free the ID
239/// for later re-use.
240#[derive(Debug)]
241pub enum IdHandle<T, A: IdAllocatorInner> {
242    /// An ID "allocated" at compile time.
243    ///
244    /// Note: It is *entirely* up to the caller to make sure the provided ID is
245    /// not used by a dynamic ID allocator.
246    Static(T),
247    /// An ID allocated at runtime, gets freed once all handles have been dropped.
248    Dynamic(Arc<internal::IdHandleInner<T, A>>),
249}
250
251impl<T: Clone, A: IdAllocatorInner> Clone for IdHandle<T, A> {
252    fn clone(&self) -> Self {
253        match self {
254            IdHandle::Static(t) => IdHandle::Static(t.clone()),
255            IdHandle::Dynamic(handle) => IdHandle::Dynamic(Arc::clone(handle)),
256        }
257    }
258}
259
260impl<T: IdGenerator, A: IdAllocatorInner> IdHandle<T, A> {
261    /// Returns the raw ID inside of this handle.
262    ///
263    /// Use with caution! It is easy for a raw ID to outlive the handle from
264    /// which it came. You are responsible for ensuring that your use of the raw
265    /// ID does not lead to ID reuse bugs.
266    pub fn unhandled(&self) -> T {
267        *self.borrow()
268    }
269}
270
271impl<T: IdGenerator, A: IdAllocatorInner> PartialEq for IdHandle<T, A> {
272    fn eq(&self, other: &Self) -> bool {
273        self.unhandled() == other.unhandled()
274    }
275}
276impl<T: IdGenerator, A: IdAllocatorInner> Eq for IdHandle<T, A> {}
277
278impl<T: IdGenerator, A: IdAllocatorInner> PartialOrd for IdHandle<T, A> {
279    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
280        Some(self.cmp(other))
281    }
282}
283
284impl<T: IdGenerator, A: IdAllocatorInner> Ord for IdHandle<T, A> {
285    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
286        self.unhandled().cmp(&other.unhandled())
287    }
288}
289
290impl<T, A: IdAllocatorInner> Borrow<T> for IdHandle<T, A> {
291    fn borrow(&self) -> &T {
292        match self {
293            IdHandle::Static(id) => id,
294            IdHandle::Dynamic(inner) => &inner.id,
295        }
296    }
297}
298
299impl<T: IdGenerator, A: IdAllocatorInner> fmt::Display for IdHandle<T, A> {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301        self.unhandled().fmt(f)
302    }
303}
304
305impl<T: IdGenerator, A: IdAllocatorInner> serde::Serialize for IdHandle<T, A> {
306    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
307    where
308        S: serde::Serializer,
309    {
310        self.unhandled().serialize(serializer)
311    }
312}
313
314mod internal {
315    use std::fmt::Debug;
316    use std::sync::Arc;
317
318    use crate::cast::CastFrom;
319    use crate::id_gen::{IdAllocator, IdAllocatorInner};
320
321    pub struct IdHandleInner<T, A: IdAllocatorInner> {
322        /// A handle to the [`IdAllocator`] used to allocated the provided id.
323        pub(super) allocator: IdAllocator<A>,
324        /// The actual ID that was allocated.
325        pub(super) id: T,
326        stored: u32,
327    }
328
329    impl<T: Debug, A: IdAllocatorInner> Debug for IdHandleInner<T, A> {
330        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331            f.debug_struct("IdHandleInner")
332                .field("id", &self.id)
333                .field("stored", &self.stored)
334                .finish_non_exhaustive()
335        }
336    }
337
338    impl<T, A: IdAllocatorInner> IdHandleInner<T, A>
339    where
340        T: CastFrom<u32>,
341    {
342        pub fn new(allocator: &IdAllocator<A>) -> Option<Self> {
343            let stored = allocator.alloc_internal()?;
344            Some(IdHandleInner {
345                allocator: IdAllocator(Arc::clone(&allocator.0)),
346                id: T::cast_from(stored),
347                stored,
348            })
349        }
350    }
351
352    impl<T, A: IdAllocatorInner> Drop for IdHandleInner<T, A> {
353        fn drop(&mut self) {
354            // Release our ID for later re-use.
355            self.allocator.free_internal(self.stored);
356        }
357    }
358}
359
360/// Number of bits the org id is offset into a connection id.
361pub const ORG_ID_OFFSET: usize = 19;
362
363/// Max (inclusive) connection id that can be produced.
364pub const MAX_ORG_ID: u32 = (1 << ORG_ID_OFFSET) - 1;
365
366/// Extracts the lower 12 bits from an org id. These are later used as the [31, 20] bits of a
367/// connection id to help route cancellation requests.
368pub fn org_id_conn_bits(uuid: &Uuid) -> u32 {
369    let lower = uuid.as_u128();
370    let lower = (lower & 0xFFF) << ORG_ID_OFFSET;
371    let lower: u32 = lower.try_into().expect("must fit");
372    lower
373}
374
375/// Returns the portion of the org's UUID present in connection id.
376pub fn conn_id_org_uuid(conn_id: u32) -> String {
377    const UPPER: [char; 16] = [
378        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F',
379    ];
380
381    // Extract UUID from conn_id: upper 12 bits excluding the first.
382    let orgid = usize::try_from((conn_id >> ORG_ID_OFFSET) & 0xFFF).expect("must cast");
383    // Convert the bits into a 3 char string and inject into the resolver template.
384    let mut dst = String::with_capacity(3);
385    dst.push(UPPER[(orgid >> 8) & 0xf]);
386    dst.push(UPPER[(orgid >> 4) & 0xf]);
387    dst.push(UPPER[orgid & 0xf]);
388    dst
389}
390
391/// Generate a random temporary ID.
392///
393/// Concretely we generate a UUIDv4 and return the last 12 characters for maximum uniqueness.
394///
395/// Note: the reason we use the last 12 characters is because the bits 6, 7, and 12 - 15
396/// are all hard coded <https://www.rfc-editor.org/rfc/rfc4122#section-4.4>.
397/// ```
398/// use mz_ore::id_gen::temp_id;
399///
400/// let temp = temp_id();
401/// assert_eq!(temp.len(), 12);
402/// assert!(temp.is_ascii());
403/// ```
404pub fn temp_id() -> String {
405    let temp_uuid = uuid::Uuid::new_v4().as_hyphenated().to_string();
406    temp_uuid.chars().rev().take_while(|c| *c != '-').collect()
407}
408
409#[cfg(test)]
410mod tests {
411    use std::collections::BTreeMap;
412
413    use crate::assert_none;
414
415    use super::*;
416
417    #[crate::test]
418    fn test_conn_org() {
419        let uuid = Uuid::parse_str("9e37ec59-56f4-450a-acbd-18ff14f10ca8").unwrap();
420        let lower = org_id_conn_bits(&uuid);
421        let org_lower_uuid = conn_id_org_uuid(lower);
422        assert_eq!(org_lower_uuid, "CA8");
423    }
424
425    #[crate::test]
426    fn test_id_gen() {
427        test_ad_allocator::<IdAllocatorInnerBitSet>();
428    }
429
430    // Test masks and maxs that intersect panic.
431    #[crate::test]
432    #[should_panic]
433    fn test_mask_intersect<A: IdAllocatorInner>() {
434        let env_lower = org_id_conn_bits(&uuid::Uuid::from_u128(u128::MAX));
435        let ida = IdAllocator::<IdAllocatorInnerBitSet>::new(
436            1 << ORG_ID_OFFSET,
437            1 << ORG_ID_OFFSET,
438            env_lower,
439        );
440        let id = ida.alloc().unwrap();
441        assert_eq!(id.unhandled(), (0xfff << ORG_ID_OFFSET) | MAX_ORG_ID);
442    }
443
444    fn test_ad_allocator<A: IdAllocatorInner>() {
445        test_id_alloc::<A>();
446        test_static_id_sorting::<A>();
447        test_id_reuse::<A>();
448        test_display::<A>();
449        test_map_lookup::<A>();
450        test_serialization::<A>();
451        test_mask::<A>();
452        test_mask_envd::<A>();
453    }
454
455    fn test_mask<A: IdAllocatorInner>() {
456        let ida = IdAllocator::<A>::new(1, 1, 0xfff << 20);
457        let id = ida.alloc().unwrap();
458        assert_eq!(id.unhandled(), (0xfff << 20) | 1);
459    }
460
461    // Test that the random conn id and and uuid each with all bits set don't intersect.
462    fn test_mask_envd<A: IdAllocatorInner>() {
463        let env_lower = org_id_conn_bits(&uuid::Uuid::from_u128(u128::MAX));
464        let ida = IdAllocator::<A>::new(MAX_ORG_ID, MAX_ORG_ID, env_lower);
465        let id = ida.alloc().unwrap();
466        assert_eq!(id.unhandled(), (0xfff << ORG_ID_OFFSET) | MAX_ORG_ID);
467    }
468
469    fn test_id_alloc<A: IdAllocatorInner>() {
470        let ida = IdAllocator::<A>::new(3, 5, 0);
471        let id3 = ida.alloc().unwrap();
472        let id4 = ida.alloc().unwrap();
473        let id5 = ida.alloc().unwrap();
474        assert_ne!(id3, id4);
475        assert_ne!(id3, id5);
476        assert_ne!(id4, id5);
477        drop(id4);
478        let _id4 = ida.alloc().unwrap();
479        drop(id5);
480        drop(id3);
481        let _id5 = ida.alloc().unwrap();
482        let _id3 = ida.alloc().unwrap();
483        match ida.alloc() {
484            Some(id) => panic!(
485                "id allocator returned {}, not expected id exhaustion error",
486                id
487            ),
488            None => (),
489        }
490    }
491
492    fn test_static_id_sorting<A: IdAllocatorInner>() {
493        let ida = IdAllocator::<A>::new(0, 0, 0);
494        let id0 = ida.alloc().unwrap();
495        let id1 = IdHandle::Static(1);
496        assert!(id0 < id1);
497
498        let ida = IdAllocator::<A>::new(1, 1, 0);
499        let id0 = IdHandle::Static(0);
500        let id1 = ida.alloc().unwrap();
501        assert!(id0 < id1);
502    }
503
504    fn test_id_reuse<A: IdAllocatorInner>() {
505        let allocator = IdAllocator::<A>::new(10, 11, 0);
506
507        let id_a = allocator.alloc().unwrap();
508        let a = id_a.unhandled();
509        let id_a_clone = id_a.clone();
510        // a should not get freed.
511        drop(id_a);
512
513        // There are only two slots, so trying to allocate 2 more should fail the second time.
514        let _id_b = allocator.alloc().unwrap();
515        assert_none!(allocator.alloc());
516
517        // a should get freed since all outstanding references have been dropped.
518        drop(id_a_clone);
519
520        // We should re-use a.
521        let id_c = allocator.alloc().unwrap();
522        assert_eq!(id_c.unhandled(), a);
523    }
524
525    fn test_display<A: IdAllocatorInner>() {
526        let allocator = IdAllocator::<A>::new(65_000, 65_000, 0);
527
528        let id_a = allocator.alloc().unwrap();
529        assert_eq!(id_a.unhandled(), 65_000);
530
531        // An IdHandle should use the inner type's Display impl.
532        let id_display = format!("{id_a}");
533        let val_display = format!("{}", id_a.unhandled());
534
535        assert_eq!(id_display, val_display);
536    }
537
538    fn test_map_lookup<A: IdAllocatorInner>() {
539        let allocator = IdAllocator::<A>::new(99, 101, 0);
540
541        let id_a = allocator.alloc().unwrap();
542        let a = id_a.unhandled();
543
544        let mut btree = BTreeMap::new();
545        btree.insert(id_a, "hello world");
546
547        // We should be able to lookup an IdHandle, based on just the value.
548        let entry = btree.remove(&a).unwrap();
549        assert_eq!(entry, "hello world");
550
551        assert!(btree.is_empty());
552    }
553
554    fn test_serialization<A: IdAllocatorInner>() {
555        let allocator = IdAllocator::<A>::new(42, 42, 0);
556
557        let id_a = allocator.alloc().unwrap();
558        assert_eq!(id_a.unhandled(), 42);
559
560        // An IdHandle should serialize the same as the inner value.
561        let id_json = serde_json::to_string(&id_a).unwrap();
562        let val_json = serde_json::to_string(&id_a.unhandled()).unwrap();
563
564        assert_eq!(id_json, val_json);
565    }
566}