Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/ra-salsa/src/runtime.rs')
| -rw-r--r-- | crates/ra-salsa/src/runtime.rs | 668 |
1 files changed, 668 insertions, 0 deletions
diff --git a/crates/ra-salsa/src/runtime.rs b/crates/ra-salsa/src/runtime.rs new file mode 100644 index 0000000000..5fe5f4b46d --- /dev/null +++ b/crates/ra-salsa/src/runtime.rs @@ -0,0 +1,668 @@ +use crate::durability::Durability; +use crate::hash::FxIndexSet; +use crate::plumbing::CycleRecoveryStrategy; +use crate::revision::{AtomicRevision, Revision}; +use crate::{Cancelled, Cycle, Database, DatabaseKeyIndex, Event, EventKind}; +use itertools::Itertools; +use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive}; +use parking_lot::{Mutex, RwLock}; +use std::hash::Hash; +use std::panic::panic_any; +use std::sync::atomic::{AtomicU32, Ordering}; +use tracing::debug; +use triomphe::{Arc, ThinArc}; + +mod dependency_graph; +use dependency_graph::DependencyGraph; + +pub(crate) mod local_state; +use local_state::LocalState; + +use self::local_state::{ActiveQueryGuard, QueryRevisions}; + +/// The salsa runtime stores the storage for all queries as well as +/// tracking the query stack and dependencies between cycles. +/// +/// Each new runtime you create (e.g., via `Runtime::new` or +/// `Runtime::default`) will have an independent set of query storage +/// associated with it. Normally, therefore, you only do this once, at +/// the start of your application. +pub struct Runtime { + /// Our unique runtime id. + id: RuntimeId, + + /// If this is a "forked" runtime, then the `revision_guard` will + /// be `Some`; this guard holds a read-lock on the global query + /// lock. + revision_guard: Option<RevisionGuard>, + + /// Local state that is specific to this runtime (thread). + local_state: LocalState, + + /// Shared state that is accessible via all runtimes. + shared_state: Arc<SharedState>, +} + +#[derive(Clone, Debug)] +pub(crate) enum WaitResult { + Completed, + Panicked, + Cycle(Cycle), +} + +impl Default for Runtime { + fn default() -> Self { + Runtime { + id: RuntimeId { counter: 0 }, + revision_guard: None, + shared_state: Default::default(), + local_state: Default::default(), + } + } +} + +impl std::fmt::Debug for Runtime { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fmt.debug_struct("Runtime") + .field("id", &self.id()) + .field("forked", &self.revision_guard.is_some()) + .field("shared_state", &self.shared_state) + .finish() + } +} + +impl Runtime { + /// Create a new runtime; equivalent to `Self::default`. This is + /// used when creating a new database. + pub fn new() -> Self { + Self::default() + } + + /// See [`crate::storage::Storage::snapshot`]. + pub(crate) fn snapshot(&self) -> Self { + if self.local_state.query_in_progress() { + panic!("it is not legal to `snapshot` during a query (see salsa-rs/salsa#80)"); + } + + let revision_guard = RevisionGuard::new(&self.shared_state); + + let id = RuntimeId { counter: self.shared_state.next_id.fetch_add(1, Ordering::SeqCst) }; + + Runtime { + id, + revision_guard: Some(revision_guard), + shared_state: self.shared_state.clone(), + local_state: Default::default(), + } + } + + /// A "synthetic write" causes the system to act *as though* some + /// input of durability `durability` has changed. This is mostly + /// useful for profiling scenarios. + /// + /// **WARNING:** Just like an ordinary write, this method triggers + /// cancellation. If you invoke it while a snapshot exists, it + /// will block until that snapshot is dropped -- if that snapshot + /// is owned by the current thread, this could trigger deadlock. + pub fn synthetic_write(&mut self, durability: Durability) { + self.with_incremented_revision(|_next_revision| Some(durability)); + } + + /// The unique identifier attached to this `SalsaRuntime`. Each + /// snapshotted runtime has a distinct identifier. + #[inline] + pub fn id(&self) -> RuntimeId { + self.id + } + + /// Returns the database-key for the query that this thread is + /// actively executing (if any). + pub fn active_query(&self) -> Option<DatabaseKeyIndex> { + self.local_state.active_query() + } + + /// Read current value of the revision counter. + #[inline] + pub(crate) fn current_revision(&self) -> Revision { + self.shared_state.revisions[0].load() + } + + /// The revision in which values with durability `d` may have last + /// changed. For D0, this is just the current revision. But for + /// higher levels of durability, this value may lag behind the + /// current revision. If we encounter a value of durability Di, + /// then, we can check this function to get a "bound" on when the + /// value may have changed, which allows us to skip walking its + /// dependencies. + #[inline] + pub(crate) fn last_changed_revision(&self, d: Durability) -> Revision { + self.shared_state.revisions[d.index()].load() + } + + /// Read current value of the revision counter. + #[inline] + pub(crate) fn pending_revision(&self) -> Revision { + self.shared_state.pending_revision.load() + } + + #[cold] + pub(crate) fn unwind_cancelled(&self) { + self.report_untracked_read(); + Cancelled::PendingWrite.throw(); + } + + /// Acquires the **global query write lock** (ensuring that no queries are + /// executing) and then increments the current revision counter; invokes + /// `op` with the global query write lock still held. + /// + /// While we wait to acquire the global query write lock, this method will + /// also increment `pending_revision_increments`, thus signalling to queries + /// that their results are "cancelled" and they should abort as expeditiously + /// as possible. + /// + /// The `op` closure should actually perform the writes needed. It is given + /// the new revision as an argument, and its return value indicates whether + /// any pre-existing value was modified: + /// + /// - returning `None` means that no pre-existing value was modified (this + /// could occur e.g. when setting some key on an input that was never set + /// before) + /// - returning `Some(d)` indicates that a pre-existing value was modified + /// and it had the durability `d`. This will update the records for when + /// values with each durability were modified. + /// + /// Note that, given our writer model, we can assume that only one thread is + /// attempting to increment the global revision at a time. + pub(crate) fn with_incremented_revision<F>(&mut self, op: F) + where + F: FnOnce(Revision) -> Option<Durability>, + { + tracing::debug!("increment_revision()"); + + if !self.permits_increment() { + panic!("increment_revision invoked during a query computation"); + } + + // Set the `pending_revision` field so that people + // know current revision is cancelled. + let current_revision = self.shared_state.pending_revision.fetch_then_increment(); + + // To modify the revision, we need the lock. + let shared_state = self.shared_state.clone(); + let _lock = shared_state.query_lock.write(); + + let old_revision = self.shared_state.revisions[0].fetch_then_increment(); + assert_eq!(current_revision, old_revision); + + let new_revision = current_revision.next(); + + debug!("increment_revision: incremented to {:?}", new_revision); + + if let Some(d) = op(new_revision) { + for rev in &self.shared_state.revisions[1..=d.index()] { + rev.store(new_revision); + } + } + } + + pub(crate) fn permits_increment(&self) -> bool { + self.revision_guard.is_none() && !self.local_state.query_in_progress() + } + + #[inline] + pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> { + self.local_state.push_query(database_key_index) + } + + /// Reports that the currently active query read the result from + /// another query. + /// + /// Also checks whether the "cycle participant" flag is set on + /// the current stack frame -- if so, panics with `CycleParticipant` + /// value, which should be caught by the code executing the query. + /// + /// # Parameters + /// + /// - `database_key`: the query whose result was read + /// - `changed_revision`: the last revision in which the result of that + /// query had changed + pub(crate) fn report_query_read_and_unwind_if_cycle_resulted( + &self, + input: DatabaseKeyIndex, + durability: Durability, + changed_at: Revision, + ) { + self.local_state + .report_query_read_and_unwind_if_cycle_resulted(input, durability, changed_at); + } + + /// Reports that the query depends on some state unknown to salsa. + /// + /// Queries which report untracked reads will be re-executed in the next + /// revision. + pub fn report_untracked_read(&self) { + self.local_state.report_untracked_read(self.current_revision()); + } + + /// Acts as though the current query had read an input with the given durability; this will force the current query's durability to be at most `durability`. + /// + /// This is mostly useful to control the durability level for [on-demand inputs](https://salsa-rs.github.io/salsa/common_patterns/on_demand_inputs.html). + pub fn report_synthetic_read(&self, durability: Durability) { + let changed_at = self.last_changed_revision(durability); + self.local_state.report_synthetic_read(durability, changed_at); + } + + /// Handles a cycle in the dependency graph that was detected when the + /// current thread tried to block on `database_key_index` which is being + /// executed by `to_id`. If this function returns, then `to_id` no longer + /// depends on the current thread, and so we should continue executing + /// as normal. Otherwise, the function will throw a `Cycle` which is expected + /// to be caught by some frame on our stack. This occurs either if there is + /// a frame on our stack with cycle recovery (possibly the top one!) or if there + /// is no cycle recovery at all. + fn unblock_cycle_and_maybe_throw( + &self, + db: &dyn Database, + dg: &mut DependencyGraph, + database_key_index: DatabaseKeyIndex, + to_id: RuntimeId, + ) { + debug!("unblock_cycle_and_maybe_throw(database_key={:?})", database_key_index); + + let mut from_stack = self.local_state.take_query_stack(); + let from_id = self.id(); + + // Make a "dummy stack frame". As we iterate through the cycle, we will collect the + // inputs from each participant. Then, if we are participating in cycle recovery, we + // will propagate those results to all participants. + let mut cycle_query = ActiveQuery::new(database_key_index); + + // Identify the cycle participants: + let cycle = { + let mut v = vec![]; + dg.for_each_cycle_participant( + from_id, + &mut from_stack, + database_key_index, + to_id, + |aqs| { + aqs.iter_mut().for_each(|aq| { + cycle_query.add_from(aq); + v.push(aq.database_key_index); + }); + }, + ); + + // We want to give the participants in a deterministic order + // (at least for this execution, not necessarily across executions), + // no matter where it started on the stack. Find the minimum + // key and rotate it to the front. + let index = v.iter().position_min().unwrap_or_default(); + v.rotate_left(index); + + // No need to store extra memory. + v.shrink_to_fit(); + + Cycle::new(Arc::new(v)) + }; + debug!("cycle {:?}, cycle_query {:#?}", cycle.debug(db), cycle_query,); + + // We can remove the cycle participants from the list of dependencies; + // they are a strongly connected component (SCC) and we only care about + // dependencies to things outside the SCC that control whether it will + // form again. + cycle_query.remove_cycle_participants(&cycle); + + // Mark each cycle participant that has recovery set, along with + // any frames that come after them on the same thread. Those frames + // are going to be unwound so that fallback can occur. + dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| { + aqs.iter_mut() + .skip_while(|aq| match db.cycle_recovery_strategy(aq.database_key_index) { + CycleRecoveryStrategy::Panic => true, + CycleRecoveryStrategy::Fallback => false, + }) + .for_each(|aq| { + debug!("marking {:?} for fallback", aq.database_key_index.debug(db)); + aq.take_inputs_from(&cycle_query); + assert!(aq.cycle.is_none()); + aq.cycle = Some(cycle.clone()); + }); + }); + + // Unblock every thread that has cycle recovery with a `WaitResult::Cycle`. + // They will throw the cycle, which will be caught by the frame that has + // cycle recovery so that it can execute that recovery. + let (me_recovered, others_recovered) = + dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id); + + self.local_state.restore_query_stack(from_stack); + + if me_recovered { + // If the current thread has recovery, we want to throw + // so that it can begin. + cycle.throw() + } else if others_recovered { + // If other threads have recovery but we didn't: return and we will block on them. + } else { + // if nobody has recover, then we panic + panic_any(cycle); + } + } + + /// Block until `other_id` completes executing `database_key`; + /// panic or unwind in the case of a cycle. + /// + /// `query_mutex_guard` is the guard for the current query's state; + /// it will be dropped after we have successfully registered the + /// dependency. + /// + /// # Propagating panics + /// + /// If the thread `other_id` panics, then our thread is considered + /// cancelled, so this function will panic with a `Cancelled` value. + /// + /// # Cycle handling + /// + /// If the thread `other_id` already depends on the current thread, + /// and hence there is a cycle in the query graph, then this function + /// will unwind instead of returning normally. The method of unwinding + /// depends on the [`Self::mutual_cycle_recovery_strategy`] + /// of the cycle participants: + /// + /// * [`CycleRecoveryStrategy::Panic`]: panic with the [`Cycle`] as the value. + /// * [`CycleRecoveryStrategy::Fallback`]: initiate unwinding with [`CycleParticipant::unwind`]. + pub(crate) fn block_on_or_unwind<QueryMutexGuard>( + &self, + db: &dyn Database, + database_key: DatabaseKeyIndex, + other_id: RuntimeId, + query_mutex_guard: QueryMutexGuard, + ) { + let mut dg = self.shared_state.dependency_graph.lock(); + + if dg.depends_on(other_id, self.id()) { + self.unblock_cycle_and_maybe_throw(db, &mut dg, database_key, other_id); + + // If the above fn returns, then (via cycle recovery) it has unblocked the + // cycle, so we can continue. + assert!(!dg.depends_on(other_id, self.id())); + } + + db.salsa_event(Event { + runtime_id: self.id(), + kind: EventKind::WillBlockOn { other_runtime_id: other_id, database_key }, + }); + + let stack = self.local_state.take_query_stack(); + + let (stack, result) = DependencyGraph::block_on( + dg, + self.id(), + database_key, + other_id, + stack, + query_mutex_guard, + ); + + self.local_state.restore_query_stack(stack); + + match result { + WaitResult::Completed => (), + + // If the other thread panicked, then we consider this thread + // cancelled. The assumption is that the panic will be detected + // by the other thread and responded to appropriately. + WaitResult::Panicked => Cancelled::PropagatedPanic.throw(), + + WaitResult::Cycle(c) => c.throw(), + } + } + + /// Invoked when this runtime completed computing `database_key` with + /// the given result `wait_result` (`wait_result` should be `None` if + /// computing `database_key` panicked and could not complete). + /// This function unblocks any dependent queries and allows them + /// to continue executing. + pub(crate) fn unblock_queries_blocked_on( + &self, + database_key: DatabaseKeyIndex, + wait_result: WaitResult, + ) { + self.shared_state + .dependency_graph + .lock() + .unblock_runtimes_blocked_on(database_key, wait_result); + } +} + +/// State that will be common to all threads (when we support multiple threads) +struct SharedState { + /// Stores the next id to use for a snapshotted runtime (starts at 1). + next_id: AtomicU32, + + /// Whenever derived queries are executing, they acquire this lock + /// in read mode. Mutating inputs (and thus creating a new + /// revision) requires a write lock (thus guaranteeing that no + /// derived queries are in progress). Note that this is not needed + /// to prevent **race conditions** -- the revision counter itself + /// is stored in an `AtomicUsize` so it can be cheaply read + /// without acquiring the lock. Rather, the `query_lock` is used + /// to ensure a higher-level consistency property. + query_lock: RwLock<()>, + + /// This is typically equal to `revision` -- set to `revision+1` + /// when a new revision is pending (which implies that the current + /// revision is cancelled). + pending_revision: AtomicRevision, + + /// Stores the "last change" revision for values of each Durability. + /// This vector is always of length at least 1 (for Durability 0) + /// but its total length depends on the number of Durabilities. The + /// element at index 0 is special as it represents the "current + /// revision". In general, we have the invariant that revisions + /// in here are *declining* -- that is, `revisions[i] >= + /// revisions[i + 1]`, for all `i`. This is because when you + /// modify a value with durability D, that implies that values + /// with durability less than D may have changed too. + revisions: [AtomicRevision; Durability::LEN], + + /// The dependency graph tracks which runtimes are blocked on one + /// another, waiting for queries to terminate. + dependency_graph: Mutex<DependencyGraph>, +} + +impl std::panic::RefUnwindSafe for SharedState {} + +impl Default for SharedState { + fn default() -> Self { + #[allow(clippy::declare_interior_mutable_const)] + const START: AtomicRevision = AtomicRevision::start(); + SharedState { + next_id: AtomicU32::new(1), + query_lock: Default::default(), + revisions: [START; Durability::LEN], + pending_revision: START, + dependency_graph: Default::default(), + } + } +} + +impl std::fmt::Debug for SharedState { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let query_lock = if self.query_lock.is_locked_exclusive() { + "<wlocked>" + } else if self.query_lock.is_locked() { + "<rlocked>" + } else { + "<unlocked>" + }; + fmt.debug_struct("SharedState") + .field("query_lock", &query_lock) + .field("revisions", &self.revisions) + .field("pending_revision", &self.pending_revision) + .finish() + } +} + +#[derive(Debug)] +struct ActiveQuery { + /// What query is executing + database_key_index: DatabaseKeyIndex, + + /// Minimum durability of inputs observed so far. + durability: Durability, + + /// Maximum revision of all inputs observed. If we observe an + /// untracked read, this will be set to the most recent revision. + changed_at: Revision, + + /// Set of subqueries that were accessed thus far, or `None` if + /// there was an untracked the read. + dependencies: Option<FxIndexSet<DatabaseKeyIndex>>, + + /// Stores the entire cycle, if one is found and this query is part of it. + cycle: Option<Cycle>, +} + +impl ActiveQuery { + fn new(database_key_index: DatabaseKeyIndex) -> Self { + ActiveQuery { + database_key_index, + durability: Durability::MAX, + changed_at: Revision::start(), + dependencies: Some(FxIndexSet::default()), + cycle: None, + } + } + + fn add_read(&mut self, input: DatabaseKeyIndex, durability: Durability, revision: Revision) { + if let Some(set) = &mut self.dependencies { + set.insert(input); + } + + self.durability = self.durability.min(durability); + self.changed_at = self.changed_at.max(revision); + } + + fn add_untracked_read(&mut self, changed_at: Revision) { + self.dependencies = None; + self.durability = Durability::LOW; + self.changed_at = changed_at; + } + + fn add_synthetic_read(&mut self, durability: Durability, revision: Revision) { + self.dependencies = None; + self.durability = self.durability.min(durability); + self.changed_at = self.changed_at.max(revision); + } + + pub(crate) fn revisions(&self) -> QueryRevisions { + let (inputs, untracked) = match &self.dependencies { + None => (None, true), + + Some(dependencies) => ( + if dependencies.is_empty() { + None + } else { + Some(ThinArc::from_header_and_iter((), dependencies.iter().copied())) + }, + false, + ), + }; + + QueryRevisions { + changed_at: self.changed_at, + inputs, + untracked, + durability: self.durability, + } + } + + /// Adds any dependencies from `other` into `self`. + /// Used during cycle recovery, see [`Runtime::create_cycle_error`]. + fn add_from(&mut self, other: &ActiveQuery) { + self.changed_at = self.changed_at.max(other.changed_at); + self.durability = self.durability.min(other.durability); + if let Some(other_dependencies) = &other.dependencies { + if let Some(my_dependencies) = &mut self.dependencies { + my_dependencies.extend(other_dependencies.iter().copied()); + } + } else { + self.dependencies = None; + } + } + + /// Removes the participants in `cycle` from my dependencies. + /// Used during cycle recovery, see [`Runtime::create_cycle_error`]. + fn remove_cycle_participants(&mut self, cycle: &Cycle) { + if let Some(my_dependencies) = &mut self.dependencies { + for p in cycle.participant_keys() { + my_dependencies.swap_remove(&p); + } + } + } + + /// Copy the changed-at, durability, and dependencies from `cycle_query`. + /// Used during cycle recovery, see [`Runtime::create_cycle_error`]. + pub(crate) fn take_inputs_from(&mut self, cycle_query: &ActiveQuery) { + self.changed_at = cycle_query.changed_at; + self.durability = cycle_query.durability; + self.dependencies.clone_from(&cycle_query.dependencies); + } +} + +/// A unique identifier for a particular runtime. Each time you create +/// a snapshot, a fresh `RuntimeId` is generated. Once a snapshot is +/// complete, its `RuntimeId` may potentially be re-used. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct RuntimeId { + counter: u32, +} + +#[derive(Clone, Debug)] +pub(crate) struct StampedValue<V> { + pub(crate) value: V, + pub(crate) durability: Durability, + pub(crate) changed_at: Revision, +} + +struct RevisionGuard { + shared_state: Arc<SharedState>, +} + +impl RevisionGuard { + fn new(shared_state: &Arc<SharedState>) -> Self { + // Subtle: we use a "recursive" lock here so that it is not an + // error to acquire a read-lock when one is already held (this + // happens when a query uses `snapshot` to spawn off parallel + // workers, for example). + // + // This has the side-effect that we are responsible to ensure + // that people contending for the write lock do not starve, + // but this is what we achieve via the cancellation mechanism. + // + // (In particular, since we only ever have one "mutating + // handle" to the database, the only contention for the global + // query lock occurs when there are "futures" evaluating + // queries in parallel, and those futures hold a read-lock + // already, so the starvation problem is more about them bring + // themselves to a close, versus preventing other people from + // *starting* work). + unsafe { + shared_state.query_lock.raw().lock_shared_recursive(); + } + + Self { shared_state: shared_state.clone() } + } +} + +impl Drop for RevisionGuard { + fn drop(&mut self) { + // Release our read-lock without using RAII. As documented in + // `Snapshot::new` above, this requires the unsafe keyword. + unsafe { + self.shared_state.query_lock.raw().unlock_shared(); + } + } +} |