Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/stdx/src/thread/pool.rs')
| -rw-r--r-- | crates/stdx/src/thread/pool.rs | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/crates/stdx/src/thread/pool.rs b/crates/stdx/src/thread/pool.rs new file mode 100644 index 0000000000..b4ab9cb292 --- /dev/null +++ b/crates/stdx/src/thread/pool.rs @@ -0,0 +1,95 @@ +//! [`Pool`] implements a basic custom thread pool +//! inspired by the [`threadpool` crate](http://docs.rs/threadpool). +//! It allows the spawning of tasks under different QoS classes. +//! rust-analyzer uses this to prioritize work based on latency requirements. +//! +//! The thread pool is implemented entirely using +//! the threading utilities in [`crate::thread`]. + +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use crossbeam_channel::{Receiver, Sender}; + +use super::{ + get_current_thread_qos_class, set_current_thread_qos_class, Builder, JoinHandle, QoSClass, + IS_QOS_AVAILABLE, +}; + +pub struct Pool { + // `_handles` is never read: the field is present + // only for its `Drop` impl. + + // The worker threads exit once the channel closes; + // make sure to keep `job_sender` above `handles` + // so that the channel is actually closed + // before we join the worker threads! + job_sender: Sender<Job>, + _handles: Vec<JoinHandle>, + extant_tasks: Arc<AtomicUsize>, +} + +struct Job { + requested_qos_class: QoSClass, + f: Box<dyn FnOnce() + Send + 'static>, +} + +impl Pool { + pub fn new(threads: usize) -> Pool { + const STACK_SIZE: usize = 8 * 1024 * 1024; + const INITIAL_QOS_CLASS: QoSClass = QoSClass::Utility; + + let (job_sender, job_receiver) = crossbeam_channel::unbounded(); + let extant_tasks = Arc::new(AtomicUsize::new(0)); + + let mut handles = Vec::with_capacity(threads); + for _ in 0..threads { + let handle = Builder::new(INITIAL_QOS_CLASS) + .stack_size(STACK_SIZE) + .name("Worker".into()) + .spawn({ + let extant_tasks = Arc::clone(&extant_tasks); + let job_receiver: Receiver<Job> = job_receiver.clone(); + move || { + let mut current_qos_class = INITIAL_QOS_CLASS; + for job in job_receiver { + if job.requested_qos_class != current_qos_class { + set_current_thread_qos_class(job.requested_qos_class); + current_qos_class = job.requested_qos_class; + } + extant_tasks.fetch_add(1, Ordering::SeqCst); + (job.f)(); + extant_tasks.fetch_sub(1, Ordering::SeqCst); + } + } + }) + .expect("failed to spawn thread"); + + handles.push(handle); + } + + Pool { _handles: handles, extant_tasks, job_sender } + } + + pub fn spawn<F>(&self, qos_class: QoSClass, f: F) + where + F: FnOnce() + Send + 'static, + { + let f = Box::new(move || { + if IS_QOS_AVAILABLE { + debug_assert_eq!(get_current_thread_qos_class(), Some(qos_class)); + } + + f() + }); + + let job = Job { requested_qos_class: qos_class, f }; + self.job_sender.send(job).unwrap(); + } + + pub fn len(&self) -> usize { + self.extant_tasks.load(Ordering::SeqCst) + } +} |