Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/ra-salsa/src/runtime/dependency_graph.rs')
-rw-r--r--crates/ra-salsa/src/runtime/dependency_graph.rs250
1 files changed, 250 insertions, 0 deletions
diff --git a/crates/ra-salsa/src/runtime/dependency_graph.rs b/crates/ra-salsa/src/runtime/dependency_graph.rs
new file mode 100644
index 0000000000..ed1d499f63
--- /dev/null
+++ b/crates/ra-salsa/src/runtime/dependency_graph.rs
@@ -0,0 +1,250 @@
+use triomphe::Arc;
+
+use crate::{DatabaseKeyIndex, RuntimeId};
+use parking_lot::{Condvar, MutexGuard};
+use rustc_hash::FxHashMap;
+use smallvec::SmallVec;
+
+use super::{ActiveQuery, WaitResult};
+
+type QueryStack = Vec<ActiveQuery>;
+
+#[derive(Debug, Default)]
+pub(super) struct DependencyGraph {
+ /// A `(K -> V)` pair in this map indicates that the runtime
+ /// `K` is blocked on some query executing in the runtime `V`.
+ /// This encodes a graph that must be acyclic (or else deadlock
+ /// will result).
+ edges: FxHashMap<RuntimeId, Edge>,
+
+ /// Encodes the `RuntimeId` that are blocked waiting for the result
+ /// of a given query.
+ query_dependents: FxHashMap<DatabaseKeyIndex, SmallVec<[RuntimeId; 4]>>,
+
+ /// When a key K completes which had dependent queries Qs blocked on it,
+ /// it stores its `WaitResult` here. As they wake up, each query Q in Qs will
+ /// come here to fetch their results.
+ wait_results: FxHashMap<RuntimeId, (QueryStack, WaitResult)>,
+}
+
+#[derive(Debug)]
+struct Edge {
+ blocked_on_id: RuntimeId,
+ blocked_on_key: DatabaseKeyIndex,
+ stack: QueryStack,
+
+ /// Signalled whenever a query with dependents completes.
+ /// Allows those dependents to check if they are ready to unblock.
+ condvar: Arc<parking_lot::Condvar>,
+}
+
+impl DependencyGraph {
+ /// True if `from_id` depends on `to_id`.
+ ///
+ /// (i.e., there is a path from `from_id` to `to_id` in the graph.)
+ pub(super) fn depends_on(&mut self, from_id: RuntimeId, to_id: RuntimeId) -> bool {
+ let mut p = from_id;
+ while let Some(q) = self.edges.get(&p).map(|edge| edge.blocked_on_id) {
+ if q == to_id {
+ return true;
+ }
+
+ p = q;
+ }
+ p == to_id
+ }
+
+ /// Invokes `closure` with a `&mut ActiveQuery` for each query that participates in the cycle.
+ /// The cycle runs as follows:
+ ///
+ /// 1. The runtime `from_id`, which has the stack `from_stack`, would like to invoke `database_key`...
+ /// 2. ...but `database_key` is already being executed by `to_id`...
+ /// 3. ...and `to_id` is transitively dependent on something which is present on `from_stack`.
+ pub(super) fn for_each_cycle_participant(
+ &mut self,
+ from_id: RuntimeId,
+ from_stack: &mut QueryStack,
+ database_key: DatabaseKeyIndex,
+ to_id: RuntimeId,
+ mut closure: impl FnMut(&mut [ActiveQuery]),
+ ) {
+ debug_assert!(self.depends_on(to_id, from_id));
+
+ // To understand this algorithm, consider this [drawing](https://is.gd/TGLI9v):
+ //
+ // database_key = QB2
+ // from_id = A
+ // to_id = B
+ // from_stack = [QA1, QA2, QA3]
+ //
+ // self.edges[B] = { C, QC2, [QB1..QB3] }
+ // self.edges[C] = { A, QA2, [QC1..QC3] }
+ //
+ // The cyclic
+ // edge we have
+ // failed to add.
+ // :
+ // A : B C
+ // :
+ // QA1 v QB1 QC1
+ // ┌► QA2 ┌──► QB2 ┌─► QC2
+ // │ QA3 ───┘ QB3 ──┘ QC3 ───┐
+ // │ │
+ // └───────────────────────────────┘
+ //
+ // Final output: [QB2, QB3, QC2, QC3, QA2, QA3]
+
+ let mut id = to_id;
+ let mut key = database_key;
+ while id != from_id {
+ // Looking at the diagram above, the idea is to
+ // take the edge from `to_id` starting at `key`
+ // (inclusive) and down to the end. We can then
+ // load up the next thread (i.e., we start at B/QB2,
+ // and then load up the dependency on C/QC2).
+ let edge = self.edges.get_mut(&id).unwrap();
+ let prefix = edge.stack.iter_mut().take_while(|p| p.database_key_index != key).count();
+ closure(&mut edge.stack[prefix..]);
+ id = edge.blocked_on_id;
+ key = edge.blocked_on_key;
+ }
+
+ // Finally, we copy in the results from `from_stack`.
+ let prefix = from_stack.iter_mut().take_while(|p| p.database_key_index != key).count();
+ closure(&mut from_stack[prefix..]);
+ }
+
+ /// Unblock each blocked runtime (excluding the current one) if some
+ /// query executing in that runtime is participating in cycle fallback.
+ ///
+ /// Returns a boolean (Current, Others) where:
+ /// * Current is true if the current runtime has cycle participants
+ /// with fallback;
+ /// * Others is true if other runtimes were unblocked.
+ pub(super) fn maybe_unblock_runtimes_in_cycle(
+ &mut self,
+ from_id: RuntimeId,
+ from_stack: &QueryStack,
+ database_key: DatabaseKeyIndex,
+ to_id: RuntimeId,
+ ) -> (bool, bool) {
+ // See diagram in `for_each_cycle_participant`.
+ let mut id = to_id;
+ let mut key = database_key;
+ let mut others_unblocked = false;
+ while id != from_id {
+ let edge = self.edges.get(&id).unwrap();
+ let prefix = edge.stack.iter().take_while(|p| p.database_key_index != key).count();
+ let next_id = edge.blocked_on_id;
+ let next_key = edge.blocked_on_key;
+
+ if let Some(cycle) = edge.stack[prefix..].iter().rev().find_map(|aq| aq.cycle.clone()) {
+ // Remove `id` from the list of runtimes blocked on `next_key`:
+ self.query_dependents.get_mut(&next_key).unwrap().retain(|r| *r != id);
+
+ // Unblock runtime so that it can resume execution once lock is released:
+ self.unblock_runtime(id, WaitResult::Cycle(cycle));
+
+ others_unblocked = true;
+ }
+
+ id = next_id;
+ key = next_key;
+ }
+
+ let prefix = from_stack.iter().take_while(|p| p.database_key_index != key).count();
+ let this_unblocked = from_stack[prefix..].iter().any(|aq| aq.cycle.is_some());
+
+ (this_unblocked, others_unblocked)
+ }
+
+ /// Modifies the graph so that `from_id` is blocked
+ /// on `database_key`, which is being computed by
+ /// `to_id`.
+ ///
+ /// For this to be reasonable, the lock on the
+ /// results table for `database_key` must be held.
+ /// This ensures that computing `database_key` doesn't
+ /// complete before `block_on` executes.
+ ///
+ /// Preconditions:
+ /// * No path from `to_id` to `from_id`
+ /// (i.e., `me.depends_on(to_id, from_id)` is false)
+ /// * `held_mutex` is a read lock (or stronger) on `database_key`
+ pub(super) fn block_on<QueryMutexGuard>(
+ mut me: MutexGuard<'_, Self>,
+ from_id: RuntimeId,
+ database_key: DatabaseKeyIndex,
+ to_id: RuntimeId,
+ from_stack: QueryStack,
+ query_mutex_guard: QueryMutexGuard,
+ ) -> (QueryStack, WaitResult) {
+ let condvar = me.add_edge(from_id, database_key, to_id, from_stack);
+
+ // Release the mutex that prevents `database_key`
+ // from completing, now that the edge has been added.
+ drop(query_mutex_guard);
+
+ loop {
+ if let Some(stack_and_result) = me.wait_results.remove(&from_id) {
+ debug_assert!(!me.edges.contains_key(&from_id));
+ return stack_and_result;
+ }
+ condvar.wait(&mut me);
+ }
+ }
+
+ /// Helper for `block_on`: performs actual graph modification
+ /// to add a dependency edge from `from_id` to `to_id`, which is
+ /// computing `database_key`.
+ fn add_edge(
+ &mut self,
+ from_id: RuntimeId,
+ database_key: DatabaseKeyIndex,
+ to_id: RuntimeId,
+ from_stack: QueryStack,
+ ) -> Arc<parking_lot::Condvar> {
+ assert_ne!(from_id, to_id);
+ debug_assert!(!self.edges.contains_key(&from_id));
+ debug_assert!(!self.depends_on(to_id, from_id));
+
+ let condvar = Arc::new(Condvar::new());
+ self.edges.insert(
+ from_id,
+ Edge {
+ blocked_on_id: to_id,
+ blocked_on_key: database_key,
+ stack: from_stack,
+ condvar: condvar.clone(),
+ },
+ );
+ self.query_dependents.entry(database_key).or_default().push(from_id);
+ condvar
+ }
+
+ /// Invoked when runtime `to_id` completes executing
+ /// `database_key`.
+ pub(super) fn unblock_runtimes_blocked_on(
+ &mut self,
+ database_key: DatabaseKeyIndex,
+ wait_result: WaitResult,
+ ) {
+ let dependents = self.query_dependents.remove(&database_key).unwrap_or_default();
+
+ for from_id in dependents {
+ self.unblock_runtime(from_id, wait_result.clone());
+ }
+ }
+
+ /// Unblock the runtime with the given id with the given wait-result.
+ /// This will cause it resume execution (though it will have to grab
+ /// the lock on this data structure first, to recover the wait result).
+ fn unblock_runtime(&mut self, id: RuntimeId, wait_result: WaitResult) {
+ let edge = self.edges.remove(&id).expect("not blocked");
+ self.wait_results.insert(id, (edge.stack, wait_result));
+
+ // Now that we have inserted the `wait_results`,
+ // notify the thread.
+ edge.condvar.notify_one();
+ }
+}