brotli/enc/
threading.rs

1use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
2use core::marker::PhantomData;
3use core::ops::Range;
4use core::{any, mem};
5#[cfg(feature = "std")]
6use std;
7
8use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
9use super::encode::{
10    hasher_setup, BrotliEncoderDestroyInstance, BrotliEncoderMaxCompressedSize,
11    BrotliEncoderOperation, SanitizeParams,
12};
13use super::BrotliAlloc;
14use crate::concat::{BroCatli, BroCatliResult};
15use crate::enc::combined_alloc::{alloc_default, allocate};
16use crate::enc::encode::BrotliEncoderStateStruct;
17
18pub type PoisonedThreadError = ();
19
20#[cfg(feature = "std")]
21pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
22#[cfg(not(feature = "std"))]
23pub type LowLevelThreadError = ();
24
25pub trait AnyBoxConstructor {
26    fn new(data: LowLevelThreadError) -> Self;
27}
28
29pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
30    fn join(self) -> Result<T, U>;
31}
32#[derive(Debug)]
33pub enum BrotliEncoderThreadError {
34    InsufficientOutputSpace,
35    ConcatenationDidNotProcessFullFile,
36    ConcatenationError(BroCatliResult),
37    ConcatenationFinalizationError(BroCatliResult),
38    OtherThreadPanic,
39    ThreadExecError(LowLevelThreadError),
40}
41
42impl AnyBoxConstructor for BrotliEncoderThreadError {
43    fn new(data: LowLevelThreadError) -> Self {
44        BrotliEncoderThreadError::ThreadExecError(data)
45    }
46}
47
48pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
49where
50    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
51{
52    data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
53    data_size: usize,
54}
55pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
56where
57    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
58{
59    compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
60    alloc: Alloc,
61}
62pub enum InternalSendAlloc<
63    ReturnVal: Send + 'static,
64    ExtraInput: Send + 'static,
65    Alloc: BrotliAlloc + Send + 'static,
66    Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
67> where
68    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
69{
70    A(Alloc, ExtraInput),
71    Join(Join),
72    SpawningOrJoining(PhantomData<ReturnVal>),
73}
74impl<
75        ReturnVal: Send + 'static,
76        ExtraInput: Send + 'static,
77        Alloc: BrotliAlloc + Send + 'static,
78        Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
79    > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
80where
81    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
82{
83    fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
84        match *self {
85            InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
86            _ => panic!("Bad state for allocator"),
87        }
88    }
89}
90
91pub struct SendAlloc<
92    ReturnValue: Send + 'static,
93    ExtraInput: Send + 'static,
94    Alloc: BrotliAlloc + Send + 'static,
95    Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
96>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
97//FIXME pub
98where
99    <Alloc as Allocator<u8>>::AllocatedMemory: Send;
100
101impl<
102        ReturnValue: Send + 'static,
103        ExtraInput: Send + 'static,
104        Alloc: BrotliAlloc + Send + 'static,
105        Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
106    > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
107where
108    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
109{
110    pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
111        SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
112    }
113    pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
114        match self.0 {
115            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
116            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
117                (other, other_extra)
118            }
119        }
120    }
121    fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
122        match self.0 {
123            InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
124            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
125                panic!("Item permanently borrowed/leaked")
126            }
127        }
128    }
129    pub fn unwrap(self) -> (Alloc, ExtraInput) {
130        match self.0 {
131            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
132            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
133                panic!("Item permanently borrowed/leaked")
134            }
135        }
136    }
137    pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
138        match mem::replace(
139            &mut self.0,
140            InternalSendAlloc::SpawningOrJoining(PhantomData),
141        ) {
142            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
143            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
144                panic!("Item permanently borrowed/leaked")
145            }
146        }
147    }
148}
149
150pub enum InternalOwned<T> {
151    // FIXME pub
152    Item(T),
153    Borrowed,
154}
155
156pub struct Owned<T>(pub InternalOwned<T>); // FIXME pub
157impl<T> Owned<T> {
158    pub fn new(data: T) -> Self {
159        Owned::<T>(InternalOwned::Item(data))
160    }
161    pub fn unwrap_or(self, other: T) -> T {
162        if let InternalOwned::Item(x) = self.0 {
163            x
164        } else {
165            other
166        }
167    }
168    pub fn unwrap(self) -> T {
169        if let InternalOwned::Item(x) = self.0 {
170            x
171        } else {
172            panic!("Item permanently borrowed")
173        }
174    }
175    pub fn view(&self) -> &T {
176        if let InternalOwned::Item(ref x) = self.0 {
177            x
178        } else {
179            panic!("Item permanently borrowed")
180        }
181    }
182}
183
184pub trait OwnedRetriever<U: Send + 'static> {
185    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
186    fn unwrap(self) -> Result<U, PoisonedThreadError>;
187}
188
189#[cfg(feature = "std")]
190impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
191    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
192        match self.read() {
193            Ok(ref u) => Ok(f(u)),
194            Err(_) => Err(PoisonedThreadError::default()),
195        }
196    }
197    fn unwrap(self) -> Result<U, PoisonedThreadError> {
198        match std::sync::Arc::try_unwrap(self) {
199            Ok(rwlock) => match rwlock.into_inner() {
200                Ok(u) => Ok(u),
201                Err(_) => Err(PoisonedThreadError::default()),
202            },
203            Err(_) => Err(PoisonedThreadError::default()),
204        }
205    }
206}
207
208pub trait BatchSpawnable<
209    ReturnValue: Send + 'static,
210    ExtraInput: Send + 'static,
211    Alloc: BrotliAlloc + Send + 'static,
212    U: Send + 'static + Sync,
213> where
214    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
215{
216    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
217    type FinalJoinHandle: OwnedRetriever<U>;
218    // this function takes in an input slice
219    // a SendAlloc per thread and converts them all into JoinHandle
220    // the input is borrowed until the joins complete
221    // owned is set to borrowed
222    // the final join handle is a r/w lock which will return the SliceW to the owner
223    // the FinalJoinHandle is only to be called when each individual JoinHandle has been examined
224    // the function is called with the thread_index, the num_threads, a reference to the slice under a read lock,
225    // and an allocator from the alloc_per_thread
226    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
227    fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
228        &mut self,
229        handle: &mut Self::FinalJoinHandle,
230        alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
231        index: usize,
232        num_threads: usize,
233        f: F,
234    );
235}
236
237pub trait BatchSpawnableLite<
238    ReturnValue: Send + 'static,
239    ExtraInput: Send + 'static,
240    Alloc: BrotliAlloc + Send + 'static,
241    U: Send + 'static + Sync,
242> where
243    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
244{
245    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
246    type FinalJoinHandle: OwnedRetriever<U>;
247    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
248    fn spawn(
249        &mut self,
250        handle: &mut Self::FinalJoinHandle,
251        alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
252        index: usize,
253        num_threads: usize,
254        f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
255    );
256}
257/*
258impl<ReturnValue:Send+'static,
259     ExtraInput:Send+'static,
260     Alloc:BrotliAlloc+Send+'static,
261     U:Send+'static+Sync>
262     BatchSpawnableLite<T, Alloc, U> for BatchSpawnable<T, Alloc, U> {
263  type JoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::JoinHandle;
264  type FinalJoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::FinalJoinHandle;
265  fn batch_spawn(
266    &mut self,
267    input: &mut Owned<U>,
268    alloc_per_thread:&mut [SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>],
269    f: fn(usize, usize, &U, Alloc) -> T,
270  ) -> Self::FinalJoinHandle {
271   <Self as BatchSpawnable<ReturnValue, ExtraInput,  Alloc, U>>::batch_spawn(self, input, alloc_per_thread, f)
272  }
273}*/
274
275pub fn CompressMultiSlice<
276    Alloc: BrotliAlloc + Send + 'static,
277    Spawner: BatchSpawnableLite<
278        CompressionThreadResult<Alloc>,
279        UnionHasher<Alloc>,
280        Alloc,
281        (
282            <Alloc as Allocator<u8>>::AllocatedMemory,
283            BrotliEncoderParams,
284        ),
285    >,
286>(
287    params: &BrotliEncoderParams,
288    input_slice: &[u8],
289    output: &mut [u8],
290    alloc_per_thread: &mut [SendAlloc<
291        CompressionThreadResult<Alloc>,
292        UnionHasher<Alloc>,
293        Alloc,
294        Spawner::JoinHandle,
295    >],
296    thread_spawner: &mut Spawner,
297) -> Result<usize, BrotliEncoderThreadError>
298where
299    <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
300    <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
301    <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
302{
303    let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
304        let mut input = allocate::<u8, _>(alloc, input_slice.len());
305        input.slice_mut().clone_from_slice(input_slice);
306        input
307    } else {
308        alloc_default::<u8, Alloc>()
309    };
310    let mut owned_input = Owned::new(input);
311    let ret = CompressMulti(
312        params,
313        &mut owned_input,
314        output,
315        alloc_per_thread,
316        thread_spawner,
317    );
318    if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
319        <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
320    }
321    ret
322}
323
324fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
325    ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
326}
327
328fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
329    hasher: UnionHasher<Alloc>,
330    thread_index: usize,
331    num_threads: usize,
332    input_and_params: &(SliceW, BrotliEncoderParams),
333    mut alloc: Alloc,
334) -> CompressionThreadResult<Alloc>
335where
336    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
337{
338    let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
339    let mut mem = allocate::<u8, _>(
340        &mut alloc,
341        BrotliEncoderMaxCompressedSize(range.end - range.start),
342    );
343    let mut state = BrotliEncoderStateStruct::new(alloc);
344    state.params = input_and_params.1.clone();
345    if thread_index != 0 {
346        state.params.catable = true; // make sure we can concatenate this to the other work results
347        state.params.magic_number = false; // no reason to pepper this around
348    }
349    state.params.appendable = true; // make sure we are at least appendable, so that future items can be catted in
350    if thread_index != 0 {
351        state.set_custom_dictionary_with_optional_precomputed_hasher(
352            range.start,
353            &input_and_params.0.slice()[..range.start],
354            hasher,
355        );
356    }
357    let mut out_offset = 0usize;
358    let compression_result;
359    let mut available_out = mem.len();
360    loop {
361        let mut next_in_offset = 0usize;
362        let mut available_in = range.end - range.start;
363        let result = state.compress_stream(
364            BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
365            &mut available_in,
366            &input_and_params.0.slice()[range.clone()],
367            &mut next_in_offset,
368            &mut available_out,
369            mem.slice_mut(),
370            &mut out_offset,
371            &mut None,
372            &mut |_a, _b, _c, _d| (),
373        );
374        let new_range = range.start + next_in_offset..range.end;
375        range = new_range;
376        if result {
377            compression_result = Ok(out_offset);
378            break;
379        } else if available_out == 0 {
380            compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); // mark no space??
381            break;
382        }
383    }
384    BrotliEncoderDestroyInstance(&mut state);
385    match compression_result {
386        Ok(size) => CompressionThreadResult::<Alloc> {
387            compressed: Ok(CompressedFileChunk {
388                data_backing: mem,
389                data_size: size,
390            }),
391            alloc: state.m8,
392        },
393        Err(e) => {
394            <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
395            CompressionThreadResult::<Alloc> {
396                compressed: Err(e),
397                alloc: state.m8,
398            }
399        }
400    }
401}
402
403pub fn CompressMulti<
404    Alloc: BrotliAlloc + Send + 'static,
405    SliceW: SliceWrapper<u8> + Send + 'static + Sync,
406    Spawner: BatchSpawnableLite<
407        CompressionThreadResult<Alloc>,
408        UnionHasher<Alloc>,
409        Alloc,
410        (SliceW, BrotliEncoderParams),
411    >,
412>(
413    params: &BrotliEncoderParams,
414    owned_input: &mut Owned<SliceW>,
415    output: &mut [u8],
416    alloc_per_thread: &mut [SendAlloc<
417        CompressionThreadResult<Alloc>,
418        UnionHasher<Alloc>,
419        Alloc,
420        Spawner::JoinHandle,
421    >],
422    thread_spawner: &mut Spawner,
423) -> Result<usize, BrotliEncoderThreadError>
424where
425    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
426    <Alloc as Allocator<u16>>::AllocatedMemory: Send,
427    <Alloc as Allocator<u32>>::AllocatedMemory: Send,
428{
429    let num_threads = alloc_per_thread.len();
430    let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
431    let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
432    // start thread spawner
433    let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
434    if num_threads > 1 {
435        // spawn first thread without "custom dictionary" while we compute the custom dictionary for other work items
436        thread_spawner.spawn(
437            &mut spawner_and_input,
438            &mut alloc_per_thread[0],
439            0,
440            num_threads,
441            compress_part,
442        );
443    }
444    // populate all hashers at once, cloning them one by one
445    let mut compression_last_thread_result;
446    if num_threads > 1 && params.favor_cpu_efficiency {
447        let mut local_params = params.clone();
448        SanitizeParams(&mut local_params);
449        let mut hasher = UnionHasher::Uninit;
450        hasher_setup(
451            alloc_per_thread[num_threads - 1].0.unwrap_input().0,
452            &mut hasher,
453            &mut local_params,
454            &[],
455            0,
456            0,
457            false,
458        );
459        for thread_index in 1..num_threads {
460            let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
461                let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
462                let overlap = hasher.StoreLookahead().wrapping_sub(1);
463                if range.end - range.start > overlap {
464                    hasher.BulkStoreRange(
465                        input_and_params.0.slice(),
466                        usize::MAX,
467                        if range.start > overlap {
468                            range.start - overlap
469                        } else {
470                            0
471                        },
472                        range.end - overlap,
473                    );
474                }
475            });
476            if let Err(_e) = res {
477                return Err(BrotliEncoderThreadError::OtherThreadPanic);
478            }
479            if thread_index + 1 != num_threads {
480                {
481                    let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
482                    *out_hasher = hasher.clone_with_alloc(alloc);
483                }
484                thread_spawner.spawn(
485                    &mut spawner_and_input,
486                    &mut alloc_per_thread[thread_index],
487                    thread_index,
488                    num_threads,
489                    compress_part,
490                );
491            }
492        }
493        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
494        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
495        compress_part(hasher,
496                      num_threads - 1,
497                      num_threads,
498                      input_and_params,
499                      alloc,
500        )
501      });
502    } else {
503        if num_threads > 1 {
504            for thread_index in 1..num_threads - 1 {
505                thread_spawner.spawn(
506                    &mut spawner_and_input,
507                    &mut alloc_per_thread[thread_index],
508                    thread_index,
509                    num_threads,
510                    compress_part,
511                );
512            }
513        }
514        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
515        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
516        compress_part(UnionHasher::Uninit,
517                      num_threads - 1,
518                      num_threads,
519                      input_and_params,
520                      alloc,
521        )
522      });
523    }
524    let mut compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
525    let mut out_file_size = 0usize;
526    let mut bro_cat_li = BroCatli::new();
527    for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
528        let mut cur_result = if index + 1 == num_threads {
529            match mem::replace(&mut compression_last_thread_result, Err(())) {
530                Ok(result) => result,
531                Err(_err) => return Err(BrotliEncoderThreadError::OtherThreadPanic),
532            }
533        } else {
534            match mem::replace(
535                &mut thread.0,
536                InternalSendAlloc::SpawningOrJoining(PhantomData),
537            ) {
538                InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
539                    panic!("Thread not properly spawned")
540                }
541                InternalSendAlloc::Join(join) => match join.join() {
542                    Ok(result) => result,
543                    Err(err) => {
544                        return Err(err);
545                    }
546                },
547            }
548        };
549        match cur_result.compressed {
550            Ok(compressed_out) => {
551                bro_cat_li.new_brotli_file();
552                let mut in_offset = 0usize;
553                let cat_result = bro_cat_li.stream(
554                    &compressed_out.data_backing.slice()[..compressed_out.data_size],
555                    &mut in_offset,
556                    output,
557                    &mut out_file_size,
558                );
559                match cat_result {
560                    BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
561                        compression_result = Ok(out_file_size);
562                    }
563                    BroCatliResult::NeedsMoreOutput => {
564                        compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
565                        // not enough space
566                    }
567                    err => {
568                        compression_result = Err(BrotliEncoderThreadError::ConcatenationError(err));
569                        // misc error
570                    }
571                }
572                <Alloc as Allocator<u8>>::free_cell(
573                    &mut cur_result.alloc,
574                    compressed_out.data_backing,
575                );
576            }
577            Err(e) => {
578                compression_result = Err(e);
579            }
580        }
581        thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
582    }
583    compression_result?;
584    match bro_cat_li.finish(output, &mut out_file_size) {
585        BroCatliResult::Success => compression_result = Ok(out_file_size),
586        err => {
587            compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
588                err,
589            ))
590        }
591    }
592    if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
593        *owned_input = Owned::new(retrieved_owned_input.0); // return the input to its rightful owner before returning
594    } else if compression_result.is_ok() {
595        compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
596    }
597    compression_result
598}