Skip to main content

mz_ore/
id_gen.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! ID generation utilities.
17
18use hibitset::BitSet;
19use serde::{Serialize, Serializer};
20use std::borrow::Borrow;
21use std::fmt;
22use std::hash::Hash;
23use std::marker::PhantomData;
24use std::ops::{AddAssign, Sub};
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::{Arc, Mutex};
27use uuid::Uuid;
28
29use rand::rngs::StdRng;
30use rand::{Rng, SeedableRng};
31
32use crate::cast::CastFrom;
33
34/// Manages the allocation of unique IDs.
35#[derive(Debug, Clone)]
36pub struct Gen<Id> {
37    id: u64,
38    phantom: PhantomData<Id>,
39}
40
41impl<Id> Default for Gen<Id> {
42    fn default() -> Self {
43        Self {
44            id: 0,
45            phantom: PhantomData,
46        }
47    }
48}
49
50impl<Id: From<u64>> Gen<Id> {
51    /// Allocates a new identifier of type `Id` and advances the generator.
52    pub fn allocate_id(&mut self) -> Id {
53        let id = self.id;
54        self.id += 1;
55        id.into()
56    }
57}
58
59/// A generator of u64-bit IDs.
60pub type IdGen = Gen<u64>;
61
62/// Manages the allocation of unique IDs.
63///
64/// Atomic version of `Gen`, for sharing between threads.
65#[derive(Debug)]
66pub struct AtomicGen<Id> {
67    id: AtomicU64,
68    phantom: PhantomData<Id>,
69}
70
71impl<Id> Default for AtomicGen<Id> {
72    fn default() -> Self {
73        Self {
74            id: AtomicU64::new(0),
75            phantom: PhantomData,
76        }
77    }
78}
79
80impl<Id: From<u64> + Default> AtomicGen<Id> {
81    /// Allocates a new identifier of type `Id` and advances the generator.
82    pub fn allocate_id(&self) -> Id {
83        // The only purpose of the atomic here is to ensure every caller receives a distinct ID,
84        // there are no requirements on the order in which IDs are produced and there is no other
85        // state protected by this atomic. `Relaxed` ordering is therefore sufficient.
86        let id = self.id.fetch_add(1, Ordering::Relaxed);
87        id.into()
88    }
89}
90
91/// A generator of u64-bit IDs.
92///
93/// Atomic version of `IdGen`, for sharing between threads.
94pub type AtomicIdGen = AtomicGen<u64>;
95
96/// IdAllocator common traits.
97pub trait IdGenerator:
98    From<u8> + AddAssign + Sub + PartialOrd + Copy + Eq + Hash + Ord + Serialize + fmt::Display
99{
100}
101
102impl<T> IdGenerator for T where
103    T: From<u8> + AddAssign + Sub + PartialOrd + Copy + Eq + Hash + Ord + Serialize + fmt::Display
104{
105}
106
107/// Manages allocation of numeric IDs.
108#[derive(Debug, Clone)]
109pub struct IdAllocator<A: IdAllocatorInner>(pub Arc<Mutex<A>>);
110
111/// Common trait for id allocators.
112pub trait IdAllocatorInner: std::fmt::Debug + Send {
113    /// Name of the allocator.
114    const NAME: &'static str;
115    /// Construct an allocator with the given range. Returned ids will be OR'd with `mask`. `mask`
116    /// must not have any bits that could be set by a number <= `max`.
117    fn new(min: u32, max: u32, mask: u32) -> Self;
118    /// Allocate a new id.
119    fn alloc(&mut self) -> Option<u32>;
120    /// Deallocate a used id, making it available for reuse.
121    fn remove(&mut self, id: u32);
122}
123
124/// IdAllocator using a HiBitSet.
125#[derive(Debug, Clone)]
126pub struct IdAllocatorInnerBitSet {
127    next: StdRng,
128    min: u32,
129    max: u32,
130    mask: u32,
131    used: BitSet,
132}
133
134impl IdAllocatorInner for IdAllocatorInnerBitSet {
135    const NAME: &'static str = "hibitset";
136
137    fn new(min: u32, max: u32, mask: u32) -> Self {
138        let total = usize::cast_from(max - min);
139        assert!(total < BitSet::BITS_PER_USIZE.pow(4));
140        IdAllocatorInnerBitSet {
141            next: StdRng::from_os_rng(),
142            min,
143            max,
144            mask,
145            used: BitSet::new(),
146        }
147    }
148
149    fn alloc(&mut self) -> Option<u32> {
150        let range = self.min..=self.max;
151        let init = self.next.random_range(range);
152        let mut next = init;
153        loop {
154            // Because hibitset has a hard maximum of 64**4 (~16 million), subtract the min in case
155            // max is above that. This is safe because we already asserted above that `max - min <
156            // 64**4`.
157            let stored = next - self.min;
158            if !self.used.add(stored) {
159                assert!(
160                    next & self.mask == 0,
161                    "chosen ID must not intersect with mask:\n{:#034b}\n{:#034b}",
162                    next,
163                    self.mask
164                );
165                return Some(next | self.mask);
166            }
167            // Existing value, increment and try again. Wrap once we hit max back to min.
168            next = if next == self.max { self.min } else { next + 1 };
169            // We fully wrapped around. BitSet doesn't have a rank or count method, so we can't
170            // compute this early.
171            if next == init {
172                return None;
173            }
174        }
175    }
176
177    fn remove(&mut self, id: u32) {
178        let id = (!self.mask) & id;
179        let stored = id - self.min;
180        self.used.remove(stored);
181    }
182}
183
184impl<A: IdAllocatorInner> IdAllocator<A> {
185    /// Creates a new `IdAllocator` that will assign IDs between `min` and
186    /// `max`, both inclusive.
187    pub fn new(min: u32, max: u32, mask: u32) -> IdAllocator<A> {
188        assert!(min <= max);
189        if mask != 0 && max > 0 {
190            // mask_check is all 1s in any bit set by all numbers >= max. Assert that the mask
191            // doesn't share any bits with those.
192            let mask_check = (1 << (max.ilog2() + 1)) - 1;
193            assert_eq!(mask & mask_check, 0, "max and mask share bits");
194        }
195        let inner = A::new(min, max, mask);
196        IdAllocator(Arc::new(Mutex::new(inner)))
197    }
198
199    /// Allocates a new ID randomly distributed between min and max.
200    ///
201    /// Returns `None` if the allocator is exhausted.
202    ///
203    /// The ID associated with the [`IdHandle`] will be freed when all of the
204    /// outstanding [`IdHandle`]s have been dropped.
205    pub fn alloc(&self) -> Option<IdHandle<u32, A>> {
206        let inner = Arc::new(internal::IdHandleInner::new(self)?);
207        Some(IdHandle::Dynamic(inner))
208    }
209
210    // Attempt to allocate a new ID. We want the ID to be randomly distributed in the range. To do
211    // this, choose a random candidate. Check if it's already in the used set, and increment in a
212    // loop until an unused slot is found. Ideally we could ask used set what its next used or
213    // unused id is after some given X, but that's not part of the API. This means that when the
214    // range is large and the used set is near full, we will spend a lot of cycles looking for an
215    // open slot. However, we limit the number of connections to the thousands, and connection IDs
216    // have 20 bits (~1 million) of space, so it is not currently possible to enter that state.
217    fn alloc_internal(&self) -> Option<u32> {
218        let mut inner = self.0.lock().expect("lock poisoned");
219        inner.alloc()
220    }
221
222    fn free_internal(&self, id: u32) {
223        let mut inner = self.0.lock().expect("lock poisoned");
224        inner.remove(id);
225    }
226}
227
228/// A clone-able owned reference to an ID.
229///
230/// Once all of the [`IdHandle`]s referencing an ID have been dropped, we will then free the ID
231/// for later re-use.
232#[derive(Debug)]
233pub enum IdHandle<T, A: IdAllocatorInner> {
234    /// An ID "allocated" at compile time.
235    ///
236    /// Note: It is *entirely* up to the caller to make sure the provided ID is
237    /// not used by a dynamic ID allocator.
238    Static(T),
239    /// An ID allocated at runtime, gets freed once all handles have been dropped.
240    Dynamic(Arc<internal::IdHandleInner<T, A>>),
241}
242
243impl<T: Clone, A: IdAllocatorInner> Clone for IdHandle<T, A> {
244    fn clone(&self) -> Self {
245        match self {
246            IdHandle::Static(t) => IdHandle::Static(t.clone()),
247            IdHandle::Dynamic(handle) => IdHandle::Dynamic(Arc::clone(handle)),
248        }
249    }
250}
251
252impl<T: IdGenerator, A: IdAllocatorInner> IdHandle<T, A> {
253    /// Returns the raw ID inside of this handle.
254    ///
255    /// Use with caution! It is easy for a raw ID to outlive the handle from
256    /// which it came. You are responsible for ensuring that your use of the raw
257    /// ID does not lead to ID reuse bugs.
258    pub fn unhandled(&self) -> T {
259        *self.borrow()
260    }
261}
262
263impl<T: IdGenerator, A: IdAllocatorInner> PartialEq for IdHandle<T, A> {
264    fn eq(&self, other: &Self) -> bool {
265        self.unhandled() == other.unhandled()
266    }
267}
268impl<T: IdGenerator, A: IdAllocatorInner> Eq for IdHandle<T, A> {}
269
270impl<T: IdGenerator, A: IdAllocatorInner> PartialOrd for IdHandle<T, A> {
271    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
272        Some(self.cmp(other))
273    }
274}
275
276impl<T: IdGenerator, A: IdAllocatorInner> Ord for IdHandle<T, A> {
277    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
278        self.unhandled().cmp(&other.unhandled())
279    }
280}
281
282impl<T, A: IdAllocatorInner> Borrow<T> for IdHandle<T, A> {
283    fn borrow(&self) -> &T {
284        match self {
285            IdHandle::Static(id) => id,
286            IdHandle::Dynamic(inner) => &inner.id,
287        }
288    }
289}
290
291impl<T: IdGenerator, A: IdAllocatorInner> fmt::Display for IdHandle<T, A> {
292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293        self.unhandled().fmt(f)
294    }
295}
296
297impl<T: IdGenerator, A: IdAllocatorInner> Serialize for IdHandle<T, A> {
298    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
299    where
300        S: Serializer,
301    {
302        self.unhandled().serialize(serializer)
303    }
304}
305
306mod internal {
307    use std::fmt::Debug;
308    use std::sync::Arc;
309
310    use crate::cast::CastFrom;
311    use crate::id_gen::{IdAllocator, IdAllocatorInner};
312
313    pub struct IdHandleInner<T, A: IdAllocatorInner> {
314        /// A handle to the [`IdAllocator`] used to allocated the provided id.
315        pub(super) allocator: IdAllocator<A>,
316        /// The actual ID that was allocated.
317        pub(super) id: T,
318        stored: u32,
319    }
320
321    impl<T: Debug, A: IdAllocatorInner> Debug for IdHandleInner<T, A> {
322        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323            f.debug_struct("IdHandleInner")
324                .field("id", &self.id)
325                .field("stored", &self.stored)
326                .finish_non_exhaustive()
327        }
328    }
329
330    impl<T, A: IdAllocatorInner> IdHandleInner<T, A>
331    where
332        T: CastFrom<u32>,
333    {
334        pub fn new(allocator: &IdAllocator<A>) -> Option<Self> {
335            let stored = allocator.alloc_internal()?;
336            Some(IdHandleInner {
337                allocator: IdAllocator(Arc::clone(&allocator.0)),
338                id: T::cast_from(stored),
339                stored,
340            })
341        }
342    }
343
344    impl<T, A: IdAllocatorInner> Drop for IdHandleInner<T, A> {
345        fn drop(&mut self) {
346            // Release our ID for later re-use.
347            self.allocator.free_internal(self.stored);
348        }
349    }
350}
351
352/// Number of bits the org id is offset into a connection id.
353pub const ORG_ID_OFFSET: usize = 19;
354
355/// Max (inclusive) connection id that can be produced.
356pub const MAX_ORG_ID: u32 = (1 << ORG_ID_OFFSET) - 1;
357
358/// Extracts the lower 12 bits from an org id. These are later used as the [31, 20] bits of a
359/// connection id to help route cancellation requests.
360pub fn org_id_conn_bits(uuid: &Uuid) -> u32 {
361    let lower = uuid.as_u128();
362    let lower = (lower & 0xFFF) << ORG_ID_OFFSET;
363    let lower: u32 = lower.try_into().expect("must fit");
364    lower
365}
366
367/// Returns the portion of the org's UUID present in connection id.
368pub fn conn_id_org_uuid(conn_id: u32) -> String {
369    const UPPER: [char; 16] = [
370        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F',
371    ];
372
373    // Extract UUID from conn_id: upper 12 bits excluding the first.
374    let orgid = usize::try_from((conn_id >> ORG_ID_OFFSET) & 0xFFF).expect("must cast");
375    // Convert the bits into a 3 char string and inject into the resolver template.
376    let mut dst = String::with_capacity(3);
377    dst.push(UPPER[(orgid >> 8) & 0xf]);
378    dst.push(UPPER[(orgid >> 4) & 0xf]);
379    dst.push(UPPER[orgid & 0xf]);
380    dst
381}
382
383/// Generate a random temporary ID.
384///
385/// Concretely we generate a UUIDv4 and return the last 12 characters for maximum uniqueness.
386///
387/// Note: the reason we use the last 12 characters is because the bits 6, 7, and 12 - 15
388/// are all hard coded <https://www.rfc-editor.org/rfc/rfc4122#section-4.4>.
389/// ```
390/// use mz_ore::id_gen::temp_id;
391///
392/// let temp = temp_id();
393/// assert_eq!(temp.len(), 12);
394/// assert!(temp.is_ascii());
395/// ```
396pub fn temp_id() -> String {
397    let temp_uuid = uuid::Uuid::new_v4().as_hyphenated().to_string();
398    temp_uuid.chars().rev().take_while(|c| *c != '-').collect()
399}
400
401#[cfg(test)]
402mod tests {
403    use std::collections::BTreeMap;
404
405    use crate::assert_none;
406
407    use super::*;
408
409    #[crate::test]
410    fn test_conn_org() {
411        let uuid = Uuid::parse_str("9e37ec59-56f4-450a-acbd-18ff14f10ca8").unwrap();
412        let lower = org_id_conn_bits(&uuid);
413        let org_lower_uuid = conn_id_org_uuid(lower);
414        assert_eq!(org_lower_uuid, "CA8");
415    }
416
417    #[crate::test]
418    fn test_id_gen() {
419        test_ad_allocator::<IdAllocatorInnerBitSet>();
420    }
421
422    // Test masks and maxs that intersect panic.
423    #[crate::test]
424    #[should_panic]
425    fn test_mask_intersect<A: IdAllocatorInner>() {
426        let env_lower = org_id_conn_bits(&uuid::Uuid::from_u128(u128::MAX));
427        let ida = IdAllocator::<IdAllocatorInnerBitSet>::new(
428            1 << ORG_ID_OFFSET,
429            1 << ORG_ID_OFFSET,
430            env_lower,
431        );
432        let id = ida.alloc().unwrap();
433        assert_eq!(id.unhandled(), (0xfff << ORG_ID_OFFSET) | MAX_ORG_ID);
434    }
435
436    fn test_ad_allocator<A: IdAllocatorInner>() {
437        test_id_alloc::<A>();
438        test_static_id_sorting::<A>();
439        test_id_reuse::<A>();
440        test_display::<A>();
441        test_map_lookup::<A>();
442        test_serialization::<A>();
443        test_mask::<A>();
444        test_mask_envd::<A>();
445    }
446
447    fn test_mask<A: IdAllocatorInner>() {
448        let ida = IdAllocator::<A>::new(1, 1, 0xfff << 20);
449        let id = ida.alloc().unwrap();
450        assert_eq!(id.unhandled(), (0xfff << 20) | 1);
451    }
452
453    // Test that the random conn id and and uuid each with all bits set don't intersect.
454    fn test_mask_envd<A: IdAllocatorInner>() {
455        let env_lower = org_id_conn_bits(&uuid::Uuid::from_u128(u128::MAX));
456        let ida = IdAllocator::<A>::new(MAX_ORG_ID, MAX_ORG_ID, env_lower);
457        let id = ida.alloc().unwrap();
458        assert_eq!(id.unhandled(), (0xfff << ORG_ID_OFFSET) | MAX_ORG_ID);
459    }
460
461    fn test_id_alloc<A: IdAllocatorInner>() {
462        let ida = IdAllocator::<A>::new(3, 5, 0);
463        let id3 = ida.alloc().unwrap();
464        let id4 = ida.alloc().unwrap();
465        let id5 = ida.alloc().unwrap();
466        assert_ne!(id3, id4);
467        assert_ne!(id3, id5);
468        assert_ne!(id4, id5);
469        drop(id4);
470        let _id4 = ida.alloc().unwrap();
471        drop(id5);
472        drop(id3);
473        let _id5 = ida.alloc().unwrap();
474        let _id3 = ida.alloc().unwrap();
475        match ida.alloc() {
476            Some(id) => panic!(
477                "id allocator returned {}, not expected id exhaustion error",
478                id
479            ),
480            None => (),
481        }
482    }
483
484    fn test_static_id_sorting<A: IdAllocatorInner>() {
485        let ida = IdAllocator::<A>::new(0, 0, 0);
486        let id0 = ida.alloc().unwrap();
487        let id1 = IdHandle::Static(1);
488        assert!(id0 < id1);
489
490        let ida = IdAllocator::<A>::new(1, 1, 0);
491        let id0 = IdHandle::Static(0);
492        let id1 = ida.alloc().unwrap();
493        assert!(id0 < id1);
494    }
495
496    fn test_id_reuse<A: IdAllocatorInner>() {
497        let allocator = IdAllocator::<A>::new(10, 11, 0);
498
499        let id_a = allocator.alloc().unwrap();
500        let a = id_a.unhandled();
501        let id_a_clone = id_a.clone();
502        // a should not get freed.
503        drop(id_a);
504
505        // There are only two slots, so trying to allocate 2 more should fail the second time.
506        let _id_b = allocator.alloc().unwrap();
507        assert_none!(allocator.alloc());
508
509        // a should get freed since all outstanding references have been dropped.
510        drop(id_a_clone);
511
512        // We should re-use a.
513        let id_c = allocator.alloc().unwrap();
514        assert_eq!(id_c.unhandled(), a);
515    }
516
517    fn test_display<A: IdAllocatorInner>() {
518        let allocator = IdAllocator::<A>::new(65_000, 65_000, 0);
519
520        let id_a = allocator.alloc().unwrap();
521        assert_eq!(id_a.unhandled(), 65_000);
522
523        // An IdHandle should use the inner type's Display impl.
524        let id_display = format!("{id_a}");
525        let val_display = format!("{}", id_a.unhandled());
526
527        assert_eq!(id_display, val_display);
528    }
529
530    fn test_map_lookup<A: IdAllocatorInner>() {
531        let allocator = IdAllocator::<A>::new(99, 101, 0);
532
533        let id_a = allocator.alloc().unwrap();
534        let a = id_a.unhandled();
535
536        let mut btree = BTreeMap::new();
537        btree.insert(id_a, "hello world");
538
539        // We should be able to lookup an IdHandle, based on just the value.
540        let entry = btree.remove(&a).unwrap();
541        assert_eq!(entry, "hello world");
542
543        assert!(btree.is_empty());
544    }
545
546    fn test_serialization<A: IdAllocatorInner>() {
547        let allocator = IdAllocator::<A>::new(42, 42, 0);
548
549        let id_a = allocator.alloc().unwrap();
550        assert_eq!(id_a.unhandled(), 42);
551
552        // An IdHandle should serialize the same as the inner value.
553        let id_json = serde_json::to_string(&id_a).unwrap();
554        let val_json = serde_json::to_string(&id_a.unhandled()).unwrap();
555
556        assert_eq!(id_json, val_json);
557    }
558}