Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/ra-salsa/src/runtime.rs')
-rw-r--r--crates/ra-salsa/src/runtime.rs668
1 files changed, 668 insertions, 0 deletions
diff --git a/crates/ra-salsa/src/runtime.rs b/crates/ra-salsa/src/runtime.rs
new file mode 100644
index 0000000000..5fe5f4b46d
--- /dev/null
+++ b/crates/ra-salsa/src/runtime.rs
@@ -0,0 +1,668 @@
+use crate::durability::Durability;
+use crate::hash::FxIndexSet;
+use crate::plumbing::CycleRecoveryStrategy;
+use crate::revision::{AtomicRevision, Revision};
+use crate::{Cancelled, Cycle, Database, DatabaseKeyIndex, Event, EventKind};
+use itertools::Itertools;
+use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
+use parking_lot::{Mutex, RwLock};
+use std::hash::Hash;
+use std::panic::panic_any;
+use std::sync::atomic::{AtomicU32, Ordering};
+use tracing::debug;
+use triomphe::{Arc, ThinArc};
+
+mod dependency_graph;
+use dependency_graph::DependencyGraph;
+
+pub(crate) mod local_state;
+use local_state::LocalState;
+
+use self::local_state::{ActiveQueryGuard, QueryRevisions};
+
+/// The salsa runtime stores the storage for all queries as well as
+/// tracking the query stack and dependencies between cycles.
+///
+/// Each new runtime you create (e.g., via `Runtime::new` or
+/// `Runtime::default`) will have an independent set of query storage
+/// associated with it. Normally, therefore, you only do this once, at
+/// the start of your application.
+pub struct Runtime {
+ /// Our unique runtime id.
+ id: RuntimeId,
+
+ /// If this is a "forked" runtime, then the `revision_guard` will
+ /// be `Some`; this guard holds a read-lock on the global query
+ /// lock.
+ revision_guard: Option<RevisionGuard>,
+
+ /// Local state that is specific to this runtime (thread).
+ local_state: LocalState,
+
+ /// Shared state that is accessible via all runtimes.
+ shared_state: Arc<SharedState>,
+}
+
+#[derive(Clone, Debug)]
+pub(crate) enum WaitResult {
+ Completed,
+ Panicked,
+ Cycle(Cycle),
+}
+
+impl Default for Runtime {
+ fn default() -> Self {
+ Runtime {
+ id: RuntimeId { counter: 0 },
+ revision_guard: None,
+ shared_state: Default::default(),
+ local_state: Default::default(),
+ }
+ }
+}
+
+impl std::fmt::Debug for Runtime {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ fmt.debug_struct("Runtime")
+ .field("id", &self.id())
+ .field("forked", &self.revision_guard.is_some())
+ .field("shared_state", &self.shared_state)
+ .finish()
+ }
+}
+
+impl Runtime {
+ /// Create a new runtime; equivalent to `Self::default`. This is
+ /// used when creating a new database.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// See [`crate::storage::Storage::snapshot`].
+ pub(crate) fn snapshot(&self) -> Self {
+ if self.local_state.query_in_progress() {
+ panic!("it is not legal to `snapshot` during a query (see salsa-rs/salsa#80)");
+ }
+
+ let revision_guard = RevisionGuard::new(&self.shared_state);
+
+ let id = RuntimeId { counter: self.shared_state.next_id.fetch_add(1, Ordering::SeqCst) };
+
+ Runtime {
+ id,
+ revision_guard: Some(revision_guard),
+ shared_state: self.shared_state.clone(),
+ local_state: Default::default(),
+ }
+ }
+
+ /// A "synthetic write" causes the system to act *as though* some
+ /// input of durability `durability` has changed. This is mostly
+ /// useful for profiling scenarios.
+ ///
+ /// **WARNING:** Just like an ordinary write, this method triggers
+ /// cancellation. If you invoke it while a snapshot exists, it
+ /// will block until that snapshot is dropped -- if that snapshot
+ /// is owned by the current thread, this could trigger deadlock.
+ pub fn synthetic_write(&mut self, durability: Durability) {
+ self.with_incremented_revision(|_next_revision| Some(durability));
+ }
+
+ /// The unique identifier attached to this `SalsaRuntime`. Each
+ /// snapshotted runtime has a distinct identifier.
+ #[inline]
+ pub fn id(&self) -> RuntimeId {
+ self.id
+ }
+
+ /// Returns the database-key for the query that this thread is
+ /// actively executing (if any).
+ pub fn active_query(&self) -> Option<DatabaseKeyIndex> {
+ self.local_state.active_query()
+ }
+
+ /// Read current value of the revision counter.
+ #[inline]
+ pub(crate) fn current_revision(&self) -> Revision {
+ self.shared_state.revisions[0].load()
+ }
+
+ /// The revision in which values with durability `d` may have last
+ /// changed. For D0, this is just the current revision. But for
+ /// higher levels of durability, this value may lag behind the
+ /// current revision. If we encounter a value of durability Di,
+ /// then, we can check this function to get a "bound" on when the
+ /// value may have changed, which allows us to skip walking its
+ /// dependencies.
+ #[inline]
+ pub(crate) fn last_changed_revision(&self, d: Durability) -> Revision {
+ self.shared_state.revisions[d.index()].load()
+ }
+
+ /// Read current value of the revision counter.
+ #[inline]
+ pub(crate) fn pending_revision(&self) -> Revision {
+ self.shared_state.pending_revision.load()
+ }
+
+ #[cold]
+ pub(crate) fn unwind_cancelled(&self) {
+ self.report_untracked_read();
+ Cancelled::PendingWrite.throw();
+ }
+
+ /// Acquires the **global query write lock** (ensuring that no queries are
+ /// executing) and then increments the current revision counter; invokes
+ /// `op` with the global query write lock still held.
+ ///
+ /// While we wait to acquire the global query write lock, this method will
+ /// also increment `pending_revision_increments`, thus signalling to queries
+ /// that their results are "cancelled" and they should abort as expeditiously
+ /// as possible.
+ ///
+ /// The `op` closure should actually perform the writes needed. It is given
+ /// the new revision as an argument, and its return value indicates whether
+ /// any pre-existing value was modified:
+ ///
+ /// - returning `None` means that no pre-existing value was modified (this
+ /// could occur e.g. when setting some key on an input that was never set
+ /// before)
+ /// - returning `Some(d)` indicates that a pre-existing value was modified
+ /// and it had the durability `d`. This will update the records for when
+ /// values with each durability were modified.
+ ///
+ /// Note that, given our writer model, we can assume that only one thread is
+ /// attempting to increment the global revision at a time.
+ pub(crate) fn with_incremented_revision<F>(&mut self, op: F)
+ where
+ F: FnOnce(Revision) -> Option<Durability>,
+ {
+ tracing::debug!("increment_revision()");
+
+ if !self.permits_increment() {
+ panic!("increment_revision invoked during a query computation");
+ }
+
+ // Set the `pending_revision` field so that people
+ // know current revision is cancelled.
+ let current_revision = self.shared_state.pending_revision.fetch_then_increment();
+
+ // To modify the revision, we need the lock.
+ let shared_state = self.shared_state.clone();
+ let _lock = shared_state.query_lock.write();
+
+ let old_revision = self.shared_state.revisions[0].fetch_then_increment();
+ assert_eq!(current_revision, old_revision);
+
+ let new_revision = current_revision.next();
+
+ debug!("increment_revision: incremented to {:?}", new_revision);
+
+ if let Some(d) = op(new_revision) {
+ for rev in &self.shared_state.revisions[1..=d.index()] {
+ rev.store(new_revision);
+ }
+ }
+ }
+
+ pub(crate) fn permits_increment(&self) -> bool {
+ self.revision_guard.is_none() && !self.local_state.query_in_progress()
+ }
+
+ #[inline]
+ pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
+ self.local_state.push_query(database_key_index)
+ }
+
+ /// Reports that the currently active query read the result from
+ /// another query.
+ ///
+ /// Also checks whether the "cycle participant" flag is set on
+ /// the current stack frame -- if so, panics with `CycleParticipant`
+ /// value, which should be caught by the code executing the query.
+ ///
+ /// # Parameters
+ ///
+ /// - `database_key`: the query whose result was read
+ /// - `changed_revision`: the last revision in which the result of that
+ /// query had changed
+ pub(crate) fn report_query_read_and_unwind_if_cycle_resulted(
+ &self,
+ input: DatabaseKeyIndex,
+ durability: Durability,
+ changed_at: Revision,
+ ) {
+ self.local_state
+ .report_query_read_and_unwind_if_cycle_resulted(input, durability, changed_at);
+ }
+
+ /// Reports that the query depends on some state unknown to salsa.
+ ///
+ /// Queries which report untracked reads will be re-executed in the next
+ /// revision.
+ pub fn report_untracked_read(&self) {
+ self.local_state.report_untracked_read(self.current_revision());
+ }
+
+ /// Acts as though the current query had read an input with the given durability; this will force the current query's durability to be at most `durability`.
+ ///
+ /// This is mostly useful to control the durability level for [on-demand inputs](https://salsa-rs.github.io/salsa/common_patterns/on_demand_inputs.html).
+ pub fn report_synthetic_read(&self, durability: Durability) {
+ let changed_at = self.last_changed_revision(durability);
+ self.local_state.report_synthetic_read(durability, changed_at);
+ }
+
+ /// Handles a cycle in the dependency graph that was detected when the
+ /// current thread tried to block on `database_key_index` which is being
+ /// executed by `to_id`. If this function returns, then `to_id` no longer
+ /// depends on the current thread, and so we should continue executing
+ /// as normal. Otherwise, the function will throw a `Cycle` which is expected
+ /// to be caught by some frame on our stack. This occurs either if there is
+ /// a frame on our stack with cycle recovery (possibly the top one!) or if there
+ /// is no cycle recovery at all.
+ fn unblock_cycle_and_maybe_throw(
+ &self,
+ db: &dyn Database,
+ dg: &mut DependencyGraph,
+ database_key_index: DatabaseKeyIndex,
+ to_id: RuntimeId,
+ ) {
+ debug!("unblock_cycle_and_maybe_throw(database_key={:?})", database_key_index);
+
+ let mut from_stack = self.local_state.take_query_stack();
+ let from_id = self.id();
+
+ // Make a "dummy stack frame". As we iterate through the cycle, we will collect the
+ // inputs from each participant. Then, if we are participating in cycle recovery, we
+ // will propagate those results to all participants.
+ let mut cycle_query = ActiveQuery::new(database_key_index);
+
+ // Identify the cycle participants:
+ let cycle = {
+ let mut v = vec![];
+ dg.for_each_cycle_participant(
+ from_id,
+ &mut from_stack,
+ database_key_index,
+ to_id,
+ |aqs| {
+ aqs.iter_mut().for_each(|aq| {
+ cycle_query.add_from(aq);
+ v.push(aq.database_key_index);
+ });
+ },
+ );
+
+ // We want to give the participants in a deterministic order
+ // (at least for this execution, not necessarily across executions),
+ // no matter where it started on the stack. Find the minimum
+ // key and rotate it to the front.
+ let index = v.iter().position_min().unwrap_or_default();
+ v.rotate_left(index);
+
+ // No need to store extra memory.
+ v.shrink_to_fit();
+
+ Cycle::new(Arc::new(v))
+ };
+ debug!("cycle {:?}, cycle_query {:#?}", cycle.debug(db), cycle_query,);
+
+ // We can remove the cycle participants from the list of dependencies;
+ // they are a strongly connected component (SCC) and we only care about
+ // dependencies to things outside the SCC that control whether it will
+ // form again.
+ cycle_query.remove_cycle_participants(&cycle);
+
+ // Mark each cycle participant that has recovery set, along with
+ // any frames that come after them on the same thread. Those frames
+ // are going to be unwound so that fallback can occur.
+ dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| {
+ aqs.iter_mut()
+ .skip_while(|aq| match db.cycle_recovery_strategy(aq.database_key_index) {
+ CycleRecoveryStrategy::Panic => true,
+ CycleRecoveryStrategy::Fallback => false,
+ })
+ .for_each(|aq| {
+ debug!("marking {:?} for fallback", aq.database_key_index.debug(db));
+ aq.take_inputs_from(&cycle_query);
+ assert!(aq.cycle.is_none());
+ aq.cycle = Some(cycle.clone());
+ });
+ });
+
+ // Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
+ // They will throw the cycle, which will be caught by the frame that has
+ // cycle recovery so that it can execute that recovery.
+ let (me_recovered, others_recovered) =
+ dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id);
+
+ self.local_state.restore_query_stack(from_stack);
+
+ if me_recovered {
+ // If the current thread has recovery, we want to throw
+ // so that it can begin.
+ cycle.throw()
+ } else if others_recovered {
+ // If other threads have recovery but we didn't: return and we will block on them.
+ } else {
+ // if nobody has recover, then we panic
+ panic_any(cycle);
+ }
+ }
+
+ /// Block until `other_id` completes executing `database_key`;
+ /// panic or unwind in the case of a cycle.
+ ///
+ /// `query_mutex_guard` is the guard for the current query's state;
+ /// it will be dropped after we have successfully registered the
+ /// dependency.
+ ///
+ /// # Propagating panics
+ ///
+ /// If the thread `other_id` panics, then our thread is considered
+ /// cancelled, so this function will panic with a `Cancelled` value.
+ ///
+ /// # Cycle handling
+ ///
+ /// If the thread `other_id` already depends on the current thread,
+ /// and hence there is a cycle in the query graph, then this function
+ /// will unwind instead of returning normally. The method of unwinding
+ /// depends on the [`Self::mutual_cycle_recovery_strategy`]
+ /// of the cycle participants:
+ ///
+ /// * [`CycleRecoveryStrategy::Panic`]: panic with the [`Cycle`] as the value.
+ /// * [`CycleRecoveryStrategy::Fallback`]: initiate unwinding with [`CycleParticipant::unwind`].
+ pub(crate) fn block_on_or_unwind<QueryMutexGuard>(
+ &self,
+ db: &dyn Database,
+ database_key: DatabaseKeyIndex,
+ other_id: RuntimeId,
+ query_mutex_guard: QueryMutexGuard,
+ ) {
+ let mut dg = self.shared_state.dependency_graph.lock();
+
+ if dg.depends_on(other_id, self.id()) {
+ self.unblock_cycle_and_maybe_throw(db, &mut dg, database_key, other_id);
+
+ // If the above fn returns, then (via cycle recovery) it has unblocked the
+ // cycle, so we can continue.
+ assert!(!dg.depends_on(other_id, self.id()));
+ }
+
+ db.salsa_event(Event {
+ runtime_id: self.id(),
+ kind: EventKind::WillBlockOn { other_runtime_id: other_id, database_key },
+ });
+
+ let stack = self.local_state.take_query_stack();
+
+ let (stack, result) = DependencyGraph::block_on(
+ dg,
+ self.id(),
+ database_key,
+ other_id,
+ stack,
+ query_mutex_guard,
+ );
+
+ self.local_state.restore_query_stack(stack);
+
+ match result {
+ WaitResult::Completed => (),
+
+ // If the other thread panicked, then we consider this thread
+ // cancelled. The assumption is that the panic will be detected
+ // by the other thread and responded to appropriately.
+ WaitResult::Panicked => Cancelled::PropagatedPanic.throw(),
+
+ WaitResult::Cycle(c) => c.throw(),
+ }
+ }
+
+ /// Invoked when this runtime completed computing `database_key` with
+ /// the given result `wait_result` (`wait_result` should be `None` if
+ /// computing `database_key` panicked and could not complete).
+ /// This function unblocks any dependent queries and allows them
+ /// to continue executing.
+ pub(crate) fn unblock_queries_blocked_on(
+ &self,
+ database_key: DatabaseKeyIndex,
+ wait_result: WaitResult,
+ ) {
+ self.shared_state
+ .dependency_graph
+ .lock()
+ .unblock_runtimes_blocked_on(database_key, wait_result);
+ }
+}
+
+/// State that will be common to all threads (when we support multiple threads)
+struct SharedState {
+ /// Stores the next id to use for a snapshotted runtime (starts at 1).
+ next_id: AtomicU32,
+
+ /// Whenever derived queries are executing, they acquire this lock
+ /// in read mode. Mutating inputs (and thus creating a new
+ /// revision) requires a write lock (thus guaranteeing that no
+ /// derived queries are in progress). Note that this is not needed
+ /// to prevent **race conditions** -- the revision counter itself
+ /// is stored in an `AtomicUsize` so it can be cheaply read
+ /// without acquiring the lock. Rather, the `query_lock` is used
+ /// to ensure a higher-level consistency property.
+ query_lock: RwLock<()>,
+
+ /// This is typically equal to `revision` -- set to `revision+1`
+ /// when a new revision is pending (which implies that the current
+ /// revision is cancelled).
+ pending_revision: AtomicRevision,
+
+ /// Stores the "last change" revision for values of each Durability.
+ /// This vector is always of length at least 1 (for Durability 0)
+ /// but its total length depends on the number of Durabilities. The
+ /// element at index 0 is special as it represents the "current
+ /// revision". In general, we have the invariant that revisions
+ /// in here are *declining* -- that is, `revisions[i] >=
+ /// revisions[i + 1]`, for all `i`. This is because when you
+ /// modify a value with durability D, that implies that values
+ /// with durability less than D may have changed too.
+ revisions: [AtomicRevision; Durability::LEN],
+
+ /// The dependency graph tracks which runtimes are blocked on one
+ /// another, waiting for queries to terminate.
+ dependency_graph: Mutex<DependencyGraph>,
+}
+
+impl std::panic::RefUnwindSafe for SharedState {}
+
+impl Default for SharedState {
+ fn default() -> Self {
+ #[allow(clippy::declare_interior_mutable_const)]
+ const START: AtomicRevision = AtomicRevision::start();
+ SharedState {
+ next_id: AtomicU32::new(1),
+ query_lock: Default::default(),
+ revisions: [START; Durability::LEN],
+ pending_revision: START,
+ dependency_graph: Default::default(),
+ }
+ }
+}
+
+impl std::fmt::Debug for SharedState {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let query_lock = if self.query_lock.is_locked_exclusive() {
+ "<wlocked>"
+ } else if self.query_lock.is_locked() {
+ "<rlocked>"
+ } else {
+ "<unlocked>"
+ };
+ fmt.debug_struct("SharedState")
+ .field("query_lock", &query_lock)
+ .field("revisions", &self.revisions)
+ .field("pending_revision", &self.pending_revision)
+ .finish()
+ }
+}
+
+#[derive(Debug)]
+struct ActiveQuery {
+ /// What query is executing
+ database_key_index: DatabaseKeyIndex,
+
+ /// Minimum durability of inputs observed so far.
+ durability: Durability,
+
+ /// Maximum revision of all inputs observed. If we observe an
+ /// untracked read, this will be set to the most recent revision.
+ changed_at: Revision,
+
+ /// Set of subqueries that were accessed thus far, or `None` if
+ /// there was an untracked the read.
+ dependencies: Option<FxIndexSet<DatabaseKeyIndex>>,
+
+ /// Stores the entire cycle, if one is found and this query is part of it.
+ cycle: Option<Cycle>,
+}
+
+impl ActiveQuery {
+ fn new(database_key_index: DatabaseKeyIndex) -> Self {
+ ActiveQuery {
+ database_key_index,
+ durability: Durability::MAX,
+ changed_at: Revision::start(),
+ dependencies: Some(FxIndexSet::default()),
+ cycle: None,
+ }
+ }
+
+ fn add_read(&mut self, input: DatabaseKeyIndex, durability: Durability, revision: Revision) {
+ if let Some(set) = &mut self.dependencies {
+ set.insert(input);
+ }
+
+ self.durability = self.durability.min(durability);
+ self.changed_at = self.changed_at.max(revision);
+ }
+
+ fn add_untracked_read(&mut self, changed_at: Revision) {
+ self.dependencies = None;
+ self.durability = Durability::LOW;
+ self.changed_at = changed_at;
+ }
+
+ fn add_synthetic_read(&mut self, durability: Durability, revision: Revision) {
+ self.dependencies = None;
+ self.durability = self.durability.min(durability);
+ self.changed_at = self.changed_at.max(revision);
+ }
+
+ pub(crate) fn revisions(&self) -> QueryRevisions {
+ let (inputs, untracked) = match &self.dependencies {
+ None => (None, true),
+
+ Some(dependencies) => (
+ if dependencies.is_empty() {
+ None
+ } else {
+ Some(ThinArc::from_header_and_iter((), dependencies.iter().copied()))
+ },
+ false,
+ ),
+ };
+
+ QueryRevisions {
+ changed_at: self.changed_at,
+ inputs,
+ untracked,
+ durability: self.durability,
+ }
+ }
+
+ /// Adds any dependencies from `other` into `self`.
+ /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
+ fn add_from(&mut self, other: &ActiveQuery) {
+ self.changed_at = self.changed_at.max(other.changed_at);
+ self.durability = self.durability.min(other.durability);
+ if let Some(other_dependencies) = &other.dependencies {
+ if let Some(my_dependencies) = &mut self.dependencies {
+ my_dependencies.extend(other_dependencies.iter().copied());
+ }
+ } else {
+ self.dependencies = None;
+ }
+ }
+
+ /// Removes the participants in `cycle` from my dependencies.
+ /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
+ fn remove_cycle_participants(&mut self, cycle: &Cycle) {
+ if let Some(my_dependencies) = &mut self.dependencies {
+ for p in cycle.participant_keys() {
+ my_dependencies.swap_remove(&p);
+ }
+ }
+ }
+
+ /// Copy the changed-at, durability, and dependencies from `cycle_query`.
+ /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
+ pub(crate) fn take_inputs_from(&mut self, cycle_query: &ActiveQuery) {
+ self.changed_at = cycle_query.changed_at;
+ self.durability = cycle_query.durability;
+ self.dependencies.clone_from(&cycle_query.dependencies);
+ }
+}
+
+/// A unique identifier for a particular runtime. Each time you create
+/// a snapshot, a fresh `RuntimeId` is generated. Once a snapshot is
+/// complete, its `RuntimeId` may potentially be re-used.
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
+pub struct RuntimeId {
+ counter: u32,
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct StampedValue<V> {
+ pub(crate) value: V,
+ pub(crate) durability: Durability,
+ pub(crate) changed_at: Revision,
+}
+
+struct RevisionGuard {
+ shared_state: Arc<SharedState>,
+}
+
+impl RevisionGuard {
+ fn new(shared_state: &Arc<SharedState>) -> Self {
+ // Subtle: we use a "recursive" lock here so that it is not an
+ // error to acquire a read-lock when one is already held (this
+ // happens when a query uses `snapshot` to spawn off parallel
+ // workers, for example).
+ //
+ // This has the side-effect that we are responsible to ensure
+ // that people contending for the write lock do not starve,
+ // but this is what we achieve via the cancellation mechanism.
+ //
+ // (In particular, since we only ever have one "mutating
+ // handle" to the database, the only contention for the global
+ // query lock occurs when there are "futures" evaluating
+ // queries in parallel, and those futures hold a read-lock
+ // already, so the starvation problem is more about them bring
+ // themselves to a close, versus preventing other people from
+ // *starting* work).
+ unsafe {
+ shared_state.query_lock.raw().lock_shared_recursive();
+ }
+
+ Self { shared_state: shared_state.clone() }
+ }
+}
+
+impl Drop for RevisionGuard {
+ fn drop(&mut self) {
+ // Release our read-lock without using RAII. As documented in
+ // `Snapshot::new` above, this requires the unsafe keyword.
+ unsafe {
+ self.shared_state.query_lock.raw().unlock_shared();
+ }
+ }
+}