Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/stdx/src/thread/pool.rs')
-rw-r--r--crates/stdx/src/thread/pool.rs95
1 files changed, 95 insertions, 0 deletions
diff --git a/crates/stdx/src/thread/pool.rs b/crates/stdx/src/thread/pool.rs
new file mode 100644
index 0000000000..b4ab9cb292
--- /dev/null
+++ b/crates/stdx/src/thread/pool.rs
@@ -0,0 +1,95 @@
+//! [`Pool`] implements a basic custom thread pool
+//! inspired by the [`threadpool` crate](http://docs.rs/threadpool).
+//! It allows the spawning of tasks under different QoS classes.
+//! rust-analyzer uses this to prioritize work based on latency requirements.
+//!
+//! The thread pool is implemented entirely using
+//! the threading utilities in [`crate::thread`].
+
+use std::sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+};
+
+use crossbeam_channel::{Receiver, Sender};
+
+use super::{
+ get_current_thread_qos_class, set_current_thread_qos_class, Builder, JoinHandle, QoSClass,
+ IS_QOS_AVAILABLE,
+};
+
+pub struct Pool {
+ // `_handles` is never read: the field is present
+ // only for its `Drop` impl.
+
+ // The worker threads exit once the channel closes;
+ // make sure to keep `job_sender` above `handles`
+ // so that the channel is actually closed
+ // before we join the worker threads!
+ job_sender: Sender<Job>,
+ _handles: Vec<JoinHandle>,
+ extant_tasks: Arc<AtomicUsize>,
+}
+
+struct Job {
+ requested_qos_class: QoSClass,
+ f: Box<dyn FnOnce() + Send + 'static>,
+}
+
+impl Pool {
+ pub fn new(threads: usize) -> Pool {
+ const STACK_SIZE: usize = 8 * 1024 * 1024;
+ const INITIAL_QOS_CLASS: QoSClass = QoSClass::Utility;
+
+ let (job_sender, job_receiver) = crossbeam_channel::unbounded();
+ let extant_tasks = Arc::new(AtomicUsize::new(0));
+
+ let mut handles = Vec::with_capacity(threads);
+ for _ in 0..threads {
+ let handle = Builder::new(INITIAL_QOS_CLASS)
+ .stack_size(STACK_SIZE)
+ .name("Worker".into())
+ .spawn({
+ let extant_tasks = Arc::clone(&extant_tasks);
+ let job_receiver: Receiver<Job> = job_receiver.clone();
+ move || {
+ let mut current_qos_class = INITIAL_QOS_CLASS;
+ for job in job_receiver {
+ if job.requested_qos_class != current_qos_class {
+ set_current_thread_qos_class(job.requested_qos_class);
+ current_qos_class = job.requested_qos_class;
+ }
+ extant_tasks.fetch_add(1, Ordering::SeqCst);
+ (job.f)();
+ extant_tasks.fetch_sub(1, Ordering::SeqCst);
+ }
+ }
+ })
+ .expect("failed to spawn thread");
+
+ handles.push(handle);
+ }
+
+ Pool { _handles: handles, extant_tasks, job_sender }
+ }
+
+ pub fn spawn<F>(&self, qos_class: QoSClass, f: F)
+ where
+ F: FnOnce() + Send + 'static,
+ {
+ let f = Box::new(move || {
+ if IS_QOS_AVAILABLE {
+ debug_assert_eq!(get_current_thread_qos_class(), Some(qos_class));
+ }
+
+ f()
+ });
+
+ let job = Job { requested_qos_class: qos_class, f };
+ self.job_sender.send(job).unwrap();
+ }
+
+ pub fn len(&self) -> usize {
+ self.extant_tasks.load(Ordering::SeqCst)
+ }
+}