Unnamed repository; edit this file 'description' to name the repository.
Add proof-of-concept QoS implementation
Luna Razzaghipour 2023-05-20
parent bb78059 · commit ca6461c
-rw-r--r--Cargo.lock6
-rw-r--r--crates/flycheck/Cargo.toml1
-rw-r--r--crates/flycheck/src/lib.rs8
-rw-r--r--crates/ide/src/prime_caches.rs6
-rw-r--r--crates/proc-macro-srv/Cargo.toml1
-rw-r--r--crates/rust-analyzer/Cargo.toml1
-rw-r--r--crates/rust-analyzer/src/bin/main.rs17
-rw-r--r--crates/rust-analyzer/src/task_pool.rs25
-rw-r--r--crates/rust-analyzer/tests/slow-tests/support.rs4
-rw-r--r--crates/stdx/Cargo.toml1
-rw-r--r--crates/stdx/src/lib.rs1
-rw-r--r--crates/stdx/src/thread.rs200
-rw-r--r--crates/vfs-notify/Cargo.toml2
-rw-r--r--crates/vfs-notify/src/lib.rs4
14 files changed, 254 insertions, 23 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 7980823a85..e7ae42a2d9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -408,7 +408,6 @@ dependencies = [
"cargo_metadata",
"command-group",
"crossbeam-channel",
- "jod-thread",
"paths",
"rustc-hash",
"serde",
@@ -1278,6 +1277,7 @@ dependencies = [
"paths",
"proc-macro-api",
"proc-macro-test",
+ "stdx",
"tt",
]
@@ -1493,7 +1493,6 @@ dependencies = [
"ide-db",
"ide-ssr",
"itertools",
- "jod-thread",
"lsp-server",
"lsp-types",
"mbe",
@@ -1712,6 +1711,7 @@ version = "0.0.0"
dependencies = [
"always-assert",
"backtrace",
+ "jod-thread",
"libc",
"miow",
"winapi",
@@ -2123,9 +2123,9 @@ name = "vfs-notify"
version = "0.0.0"
dependencies = [
"crossbeam-channel",
- "jod-thread",
"notify",
"paths",
+ "stdx",
"tracing",
"vfs",
"walkdir",
diff --git a/crates/flycheck/Cargo.toml b/crates/flycheck/Cargo.toml
index 1e0b3605b1..3f6671b1c4 100644
--- a/crates/flycheck/Cargo.toml
+++ b/crates/flycheck/Cargo.toml
@@ -18,7 +18,6 @@ cargo_metadata = "0.15.0"
rustc-hash = "1.1.0"
serde_json.workspace = true
serde.workspace = true
-jod-thread = "0.1.2"
command-group = "2.0.1"
# local deps
diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs
index accb14a51d..a4aa346a1c 100644
--- a/crates/flycheck/src/lib.rs
+++ b/crates/flycheck/src/lib.rs
@@ -77,7 +77,7 @@ impl fmt::Display for FlycheckConfig {
pub struct FlycheckHandle {
// XXX: drop order is significant
sender: Sender<StateChange>,
- _thread: jod_thread::JoinHandle,
+ _thread: stdx::thread::JoinHandle,
id: usize,
}
@@ -90,7 +90,7 @@ impl FlycheckHandle {
) -> FlycheckHandle {
let actor = FlycheckActor::new(id, sender, config, workspace_root);
let (sender, receiver) = unbounded::<StateChange>();
- let thread = jod_thread::Builder::new()
+ let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
.name("Flycheck".to_owned())
.spawn(move || actor.run(receiver))
.expect("failed to spawn thread");
@@ -395,7 +395,7 @@ struct CargoHandle {
/// The handle to the actual cargo process. As we cannot cancel directly from with
/// a read syscall dropping and therefore terminating the process is our best option.
child: JodGroupChild,
- thread: jod_thread::JoinHandle<io::Result<(bool, String)>>,
+ thread: stdx::thread::JoinHandle<io::Result<(bool, String)>>,
receiver: Receiver<CargoMessage>,
}
@@ -409,7 +409,7 @@ impl CargoHandle {
let (sender, receiver) = unbounded();
let actor = CargoActor::new(sender, stdout, stderr);
- let thread = jod_thread::Builder::new()
+ let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
.name("CargoHandle".to_owned())
.spawn(move || actor.run())
.expect("failed to spawn thread");
diff --git a/crates/ide/src/prime_caches.rs b/crates/ide/src/prime_caches.rs
index 2962700360..f049a225f0 100644
--- a/crates/ide/src/prime_caches.rs
+++ b/crates/ide/src/prime_caches.rs
@@ -80,7 +80,11 @@ pub(crate) fn parallel_prime_caches(
for _ in 0..num_worker_threads {
let worker = prime_caches_worker.clone();
let db = db.snapshot();
- std::thread::spawn(move || Cancelled::catch(|| worker(db)));
+
+ stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
+ .allow_leak(true)
+ .spawn(move || Cancelled::catch(|| worker(db)))
+ .expect("failed to spawn thread");
}
(work_sender, progress_receiver)
diff --git a/crates/proc-macro-srv/Cargo.toml b/crates/proc-macro-srv/Cargo.toml
index f7f07cfcb2..d5eb157bfe 100644
--- a/crates/proc-macro-srv/Cargo.toml
+++ b/crates/proc-macro-srv/Cargo.toml
@@ -22,6 +22,7 @@ object = { version = "0.30.2", default-features = false, features = [
libloading = "0.7.3"
memmap2 = "0.5.4"
+stdx.workspace = true
tt.workspace = true
mbe.workspace = true
paths.workspace = true
diff --git a/crates/rust-analyzer/Cargo.toml b/crates/rust-analyzer/Cargo.toml
index ae5b8e4c42..3f795340b2 100644
--- a/crates/rust-analyzer/Cargo.toml
+++ b/crates/rust-analyzer/Cargo.toml
@@ -86,7 +86,6 @@ jemallocator = { version = "0.5.0", package = "tikv-jemallocator", optional = tr
[dev-dependencies]
expect-test = "1.4.0"
-jod-thread = "0.1.2"
xshell = "0.2.2"
test-utils.workspace = true
diff --git a/crates/rust-analyzer/src/bin/main.rs b/crates/rust-analyzer/src/bin/main.rs
index 992e174a42..660a780eb0 100644
--- a/crates/rust-analyzer/src/bin/main.rs
+++ b/crates/rust-analyzer/src/bin/main.rs
@@ -78,7 +78,7 @@ fn try_main(flags: flags::RustAnalyzer) -> Result<()> {
println!("rust-analyzer {}", rust_analyzer::version());
return Ok(());
}
- with_extra_thread("LspServer", run_server)?;
+ with_extra_thread("LspServer", stdx::thread::QoSClass::Utility, run_server)?;
}
flags::RustAnalyzerCmd::Parse(cmd) => cmd.run()?,
flags::RustAnalyzerCmd::Symbols(cmd) => cmd.run()?,
@@ -136,14 +136,17 @@ const STACK_SIZE: usize = 1024 * 1024 * 8;
/// space.
fn with_extra_thread(
thread_name: impl Into<String>,
+ qos_class: stdx::thread::QoSClass,
f: impl FnOnce() -> Result<()> + Send + 'static,
) -> Result<()> {
- let handle =
- std::thread::Builder::new().name(thread_name.into()).stack_size(STACK_SIZE).spawn(f)?;
- match handle.join() {
- Ok(res) => res,
- Err(panic) => std::panic::resume_unwind(panic),
- }
+ let handle = stdx::thread::Builder::new(qos_class)
+ .name(thread_name.into())
+ .stack_size(STACK_SIZE)
+ .spawn(f)?;
+
+ handle.join()?;
+
+ Ok(())
}
fn run_server() -> Result<()> {
diff --git a/crates/rust-analyzer/src/task_pool.rs b/crates/rust-analyzer/src/task_pool.rs
index 616e449984..0c5a4f3055 100644
--- a/crates/rust-analyzer/src/task_pool.rs
+++ b/crates/rust-analyzer/src/task_pool.rs
@@ -1,5 +1,7 @@
//! A thin wrapper around `ThreadPool` to make sure that we join all things
//! properly.
+use std::sync::{Arc, Barrier};
+
use crossbeam_channel::Sender;
pub(crate) struct TaskPool<T> {
@@ -16,6 +18,18 @@ impl<T> TaskPool<T> {
.thread_stack_size(STACK_SIZE)
.num_threads(threads)
.build();
+
+ // Set QoS of all threads in threadpool.
+ let barrier = Arc::new(Barrier::new(threads + 1));
+ for _ in 0..threads {
+ let barrier = barrier.clone();
+ inner.execute(move || {
+ stdx::thread::set_current_thread_qos_class(stdx::thread::QoSClass::Utility);
+ barrier.wait();
+ });
+ }
+ barrier.wait();
+
TaskPool { sender, inner }
}
@@ -26,7 +40,16 @@ impl<T> TaskPool<T> {
{
self.inner.execute({
let sender = self.sender.clone();
- move || sender.send(task()).unwrap()
+ move || {
+ if stdx::thread::IS_QOS_AVAILABLE {
+ debug_assert_eq!(
+ stdx::thread::get_current_thread_qos_class(),
+ Some(stdx::thread::QoSClass::Utility)
+ );
+ }
+
+ sender.send(task()).unwrap()
+ }
})
}
diff --git a/crates/rust-analyzer/tests/slow-tests/support.rs b/crates/rust-analyzer/tests/slow-tests/support.rs
index d0eeee189c..33d7f6576c 100644
--- a/crates/rust-analyzer/tests/slow-tests/support.rs
+++ b/crates/rust-analyzer/tests/slow-tests/support.rs
@@ -155,7 +155,7 @@ pub(crate) fn project(fixture: &str) -> Server {
pub(crate) struct Server {
req_id: Cell<i32>,
messages: RefCell<Vec<Message>>,
- _thread: jod_thread::JoinHandle<()>,
+ _thread: stdx::thread::JoinHandle,
client: Connection,
/// XXX: remove the tempdir last
dir: TestDir,
@@ -165,7 +165,7 @@ impl Server {
fn new(dir: TestDir, config: Config) -> Server {
let (connection, client) = Connection::memory();
- let _thread = jod_thread::Builder::new()
+ let _thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
.name("test server".to_string())
.spawn(move || main_loop(config, connection).unwrap())
.expect("failed to spawn a thread");
diff --git a/crates/stdx/Cargo.toml b/crates/stdx/Cargo.toml
index c881f2fd3f..986e3fcdcf 100644
--- a/crates/stdx/Cargo.toml
+++ b/crates/stdx/Cargo.toml
@@ -15,6 +15,7 @@ doctest = false
libc = "0.2.135"
backtrace = { version = "0.3.65", optional = true }
always-assert = { version = "0.1.2", features = ["log"] }
+jod-thread = "0.1.2"
# Think twice before adding anything here
[target.'cfg(windows)'.dependencies]
diff --git a/crates/stdx/src/lib.rs b/crates/stdx/src/lib.rs
index 8df86e8100..24990d6a0e 100644
--- a/crates/stdx/src/lib.rs
+++ b/crates/stdx/src/lib.rs
@@ -11,6 +11,7 @@ pub mod process;
pub mod panic_context;
pub mod non_empty_vec;
pub mod rand;
+pub mod thread;
pub use always_assert::{always, never};
diff --git a/crates/stdx/src/thread.rs b/crates/stdx/src/thread.rs
new file mode 100644
index 0000000000..2bf9141cbf
--- /dev/null
+++ b/crates/stdx/src/thread.rs
@@ -0,0 +1,200 @@
+//! A utility module for working with threads that automatically joins threads upon drop
+//! and provides functionality for interfacing with operating system quality of service (QoS) APIs.
+//!
+//! As a system, rust-analyzer should have the property that
+//! old manual scheduling APIs are replaced entirely by QoS.
+//! To maintain this invariant, we panic when it is clear that
+//! old scheduling APIs have been used.
+//!
+//! Moreover, we also want to ensure that every thread has a QoS set explicitly
+//! to force a decision about its importance to the system.
+//! Thus, [`QoSClass`] has no default value
+//! and every entry point to creating a thread requires a [`QoSClass`] upfront.
+
+use std::fmt;
+
+pub fn spawn<F, T>(qos_class: QoSClass, f: F) -> JoinHandle<T>
+where
+ F: FnOnce() -> T,
+ F: Send + 'static,
+ T: Send + 'static,
+{
+ Builder::new(qos_class).spawn(f).expect("failed to spawn thread")
+}
+
+pub struct Builder {
+ qos_class: QoSClass,
+ inner: jod_thread::Builder,
+ allow_leak: bool,
+}
+
+impl Builder {
+ pub fn new(qos_class: QoSClass) -> Builder {
+ Builder { qos_class, inner: jod_thread::Builder::new(), allow_leak: false }
+ }
+
+ pub fn name(self, name: String) -> Builder {
+ Builder { inner: self.inner.name(name), ..self }
+ }
+
+ pub fn stack_size(self, size: usize) -> Builder {
+ Builder { inner: self.inner.stack_size(size), ..self }
+ }
+
+ pub fn allow_leak(self, b: bool) -> Builder {
+ Builder { allow_leak: b, ..self }
+ }
+
+ pub fn spawn<F, T>(self, f: F) -> std::io::Result<JoinHandle<T>>
+ where
+ F: FnOnce() -> T,
+ F: Send + 'static,
+ T: Send + 'static,
+ {
+ let inner_handle = self.inner.spawn(move || {
+ set_current_thread_qos_class(self.qos_class);
+ f()
+ })?;
+
+ Ok(JoinHandle { inner: Some(inner_handle), allow_leak: self.allow_leak })
+ }
+}
+
+pub struct JoinHandle<T = ()> {
+ // `inner` is an `Option` so that we can
+ // take ownership of the contained `JoinHandle`
+ // in the `Drop` impl below.
+ inner: Option<jod_thread::JoinHandle<T>>,
+ allow_leak: bool,
+}
+
+impl<T> JoinHandle<T> {
+ pub fn join(mut self) -> T {
+ self.inner.take().unwrap().join()
+ }
+}
+
+impl<T> Drop for JoinHandle<T> {
+ fn drop(&mut self) {
+ if !self.allow_leak {
+ return;
+ }
+
+ if let Some(join_handle) = self.inner.take() {
+ join_handle.detach();
+ }
+ }
+}
+
+impl<T> fmt::Debug for JoinHandle<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.pad("JoinHandle { .. }")
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub enum QoSClass {
+ // Maintain order in priority from least to most.
+ Background,
+ Utility,
+ UserInitiated,
+ UserInteractive,
+}
+
+#[cfg(target_vendor = "apple")]
+pub const IS_QOS_AVAILABLE: bool = true;
+
+#[cfg(not(target_vendor = "apple"))]
+pub const IS_QOS_AVAILABLE: bool = false;
+
+// All Apple platforms use XNU as their kernel
+// and thus have the concept of QoS.
+#[cfg(target_vendor = "apple")]
+pub fn set_current_thread_qos_class(class: QoSClass) {
+ let c = match class {
+ QoSClass::UserInteractive => libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE,
+ QoSClass::UserInitiated => libc::qos_class_t::QOS_CLASS_USER_INITIATED,
+ QoSClass::Utility => libc::qos_class_t::QOS_CLASS_UTILITY,
+ QoSClass::Background => libc::qos_class_t::QOS_CLASS_BACKGROUND,
+ };
+
+ let code = unsafe { libc::pthread_set_qos_class_self_np(c, 0) };
+
+ if code == 0 {
+ return;
+ }
+
+ let errno = unsafe { *libc::__error() };
+
+ match errno {
+ libc::EPERM => {
+ // This thread has been excluded from the QoS system
+ // due to a previous call to a function such as `pthread_setschedparam`
+ // which is incompatible with QoS.
+ //
+ // Let’s just panic here because rust-analyzer as a system
+ // should have the property that QoS is used consistently
+ // instead of old manual scheduling management APIs.
+ panic!("tried to set QoS of thread which has opted out of QoS (os error {errno})")
+ }
+
+ libc::EINVAL => {
+ // This is returned if we pass something other than a qos_class_t
+ // to `pthread_set_qos_class_self_np`.
+ //
+ // This is impossible, so again panic.
+ unreachable!("invalid qos_class_t value was passed to pthread_set_qos_class_self_np")
+ }
+
+ _ => {
+ // `pthread_set_qos_class_self_np`’s documentation
+ // does not mention any other errors.
+ unreachable!("`pthread_set_qos_class_self_np` returned unexpected error {errno}")
+ }
+ }
+}
+
+#[cfg(not(target_vendor = "apple"))]
+pub fn set_current_thread_qos_class(class: QoSClass) {
+ // FIXME: Windows has QoS APIs, we should use them!
+}
+
+#[cfg(target_vendor = "apple")]
+pub fn get_current_thread_qos_class() -> Option<QoSClass> {
+ let current_thread = unsafe { libc::pthread_self() };
+ let mut qos_class_raw = libc::qos_class_t::QOS_CLASS_UNSPECIFIED;
+ let code = unsafe {
+ libc::pthread_get_qos_class_np(current_thread, &mut qos_class_raw, std::ptr::null_mut())
+ };
+
+ if code != 0 {
+ // `pthread_get_qos_class_np`’s documentation states that
+ // an error value is placed into errno if the return code is not zero.
+ // However, it never states what errors are possible.
+ // Inspecting the source[0] shows that, as of this writing, it always returns zero.
+ //
+ // Whatever errors the function could report in future are likely to be
+ // ones which we cannot handle anyway
+ //
+ // 0: https://github.com/apple-oss-distributions/libpthread/blob/67e155c94093be9a204b69637d198eceff2c7c46/src/qos.c#L171-L177
+ let errno = unsafe { *libc::__error() };
+ unreachable!("`pthread_get_qos_class_np` failed unexpectedly (os error {errno})");
+ }
+
+ match qos_class_raw {
+ libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE => Some(QoSClass::UserInteractive),
+ libc::qos_class_t::QOS_CLASS_USER_INITIATED => Some(QoSClass::UserInitiated),
+ libc::qos_class_t::QOS_CLASS_DEFAULT => None, // QoS has never been set
+ libc::qos_class_t::QOS_CLASS_UTILITY => Some(QoSClass::Utility),
+ libc::qos_class_t::QOS_CLASS_BACKGROUND => Some(QoSClass::Background),
+ libc::qos_class_t::QOS_CLASS_UNSPECIFIED => {
+ // We panic here because rust-analyzer should never use
+ panic!("tried to get QoS of thread which has opted out of QoS")
+ }
+ }
+}
+
+#[cfg(not(target_vendor = "apple"))]
+pub fn get_current_thread_qos_class() -> Option<QoSClass> {
+ None
+}
diff --git a/crates/vfs-notify/Cargo.toml b/crates/vfs-notify/Cargo.toml
index e06b98d811..5d61a22728 100644
--- a/crates/vfs-notify/Cargo.toml
+++ b/crates/vfs-notify/Cargo.toml
@@ -13,10 +13,10 @@ doctest = false
[dependencies]
tracing = "0.1.35"
-jod-thread = "0.1.2"
walkdir = "2.3.2"
crossbeam-channel = "0.5.5"
notify = "5.0"
+stdx.workspace = true
vfs.workspace = true
paths.workspace = true
diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs
index c95304e55a..26f7a9fc42 100644
--- a/crates/vfs-notify/src/lib.rs
+++ b/crates/vfs-notify/src/lib.rs
@@ -21,7 +21,7 @@ use walkdir::WalkDir;
pub struct NotifyHandle {
// Relative order of fields below is significant.
sender: Sender<Message>,
- _thread: jod_thread::JoinHandle,
+ _thread: stdx::thread::JoinHandle,
}
#[derive(Debug)]
@@ -34,7 +34,7 @@ impl loader::Handle for NotifyHandle {
fn spawn(sender: loader::Sender) -> NotifyHandle {
let actor = NotifyActor::new(sender);
let (sender, receiver) = unbounded::<Message>();
- let thread = jod_thread::Builder::new()
+ let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
.name("VfsLoader".to_owned())
.spawn(move || actor.run(receiver))
.expect("failed to spawn thread");