Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/ra-salsa/src/runtime.rs')
| -rw-r--r-- | crates/ra-salsa/src/runtime.rs | 668 |
1 files changed, 0 insertions, 668 deletions
diff --git a/crates/ra-salsa/src/runtime.rs b/crates/ra-salsa/src/runtime.rs deleted file mode 100644 index cb16ba0044..0000000000 --- a/crates/ra-salsa/src/runtime.rs +++ /dev/null @@ -1,668 +0,0 @@ -use crate::durability::Durability; -use crate::hash::FxIndexSet; -use crate::plumbing::CycleRecoveryStrategy; -use crate::revision::{AtomicRevision, Revision}; -use crate::{Cancelled, Cycle, Database, DatabaseKeyIndex, Event, EventKind}; -use itertools::Itertools; -use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive}; -use parking_lot::{Mutex, RwLock}; -use std::hash::Hash; -use std::panic::panic_any; -use std::sync::atomic::{AtomicU32, Ordering}; -use tracing::trace; -use triomphe::{Arc, ThinArc}; - -mod dependency_graph; -use dependency_graph::DependencyGraph; - -pub(crate) mod local_state; -use local_state::LocalState; - -use self::local_state::{ActiveQueryGuard, QueryRevisions}; - -/// The salsa runtime stores the storage for all queries as well as -/// tracking the query stack and dependencies between cycles. -/// -/// Each new runtime you create (e.g., via `Runtime::new` or -/// `Runtime::default`) will have an independent set of query storage -/// associated with it. Normally, therefore, you only do this once, at -/// the start of your application. -pub struct Runtime { - /// Our unique runtime id. - id: RuntimeId, - - /// If this is a "forked" runtime, then the `revision_guard` will - /// be `Some`; this guard holds a read-lock on the global query - /// lock. - revision_guard: Option<RevisionGuard>, - - /// Local state that is specific to this runtime (thread). - local_state: LocalState, - - /// Shared state that is accessible via all runtimes. - shared_state: Arc<SharedState>, -} - -#[derive(Clone, Debug)] -pub(crate) enum WaitResult { - Completed, - Panicked, - Cycle(Cycle), -} - -impl Default for Runtime { - fn default() -> Self { - Runtime { - id: RuntimeId { counter: 0 }, - revision_guard: None, - shared_state: Default::default(), - local_state: Default::default(), - } - } -} - -impl std::fmt::Debug for Runtime { - fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - fmt.debug_struct("Runtime") - .field("id", &self.id()) - .field("forked", &self.revision_guard.is_some()) - .field("shared_state", &self.shared_state) - .finish() - } -} - -impl Runtime { - /// Create a new runtime; equivalent to `Self::default`. This is - /// used when creating a new database. - pub fn new() -> Self { - Self::default() - } - - /// See [`crate::storage::Storage::snapshot`]. - pub(crate) fn snapshot(&self) -> Self { - if self.local_state.query_in_progress() { - panic!("it is not legal to `snapshot` during a query (see salsa-rs/salsa#80)"); - } - - let revision_guard = RevisionGuard::new(&self.shared_state); - - let id = RuntimeId { counter: self.shared_state.next_id.fetch_add(1, Ordering::SeqCst) }; - - Runtime { - id, - revision_guard: Some(revision_guard), - shared_state: self.shared_state.clone(), - local_state: Default::default(), - } - } - - /// A "synthetic write" causes the system to act *as though* some - /// input of durability `durability` has changed. This is mostly - /// useful for profiling scenarios. - /// - /// **WARNING:** Just like an ordinary write, this method triggers - /// cancellation. If you invoke it while a snapshot exists, it - /// will block until that snapshot is dropped -- if that snapshot - /// is owned by the current thread, this could trigger deadlock. - pub fn synthetic_write(&mut self, durability: Durability) { - self.with_incremented_revision(|_next_revision| Some(durability)); - } - - /// The unique identifier attached to this `SalsaRuntime`. Each - /// snapshotted runtime has a distinct identifier. - #[inline] - pub fn id(&self) -> RuntimeId { - self.id - } - - /// Returns the database-key for the query that this thread is - /// actively executing (if any). - pub fn active_query(&self) -> Option<DatabaseKeyIndex> { - self.local_state.active_query() - } - - /// Read current value of the revision counter. - #[inline] - pub(crate) fn current_revision(&self) -> Revision { - self.shared_state.revisions[0].load() - } - - /// The revision in which values with durability `d` may have last - /// changed. For D0, this is just the current revision. But for - /// higher levels of durability, this value may lag behind the - /// current revision. If we encounter a value of durability Di, - /// then, we can check this function to get a "bound" on when the - /// value may have changed, which allows us to skip walking its - /// dependencies. - #[inline] - pub(crate) fn last_changed_revision(&self, d: Durability) -> Revision { - self.shared_state.revisions[d.index()].load() - } - - /// Read current value of the revision counter. - #[inline] - pub(crate) fn pending_revision(&self) -> Revision { - self.shared_state.pending_revision.load() - } - - #[cold] - pub(crate) fn unwind_cancelled(&self) { - self.report_untracked_read(); - Cancelled::PendingWrite.throw(); - } - - /// Acquires the **global query write lock** (ensuring that no queries are - /// executing) and then increments the current revision counter; invokes - /// `op` with the global query write lock still held. - /// - /// While we wait to acquire the global query write lock, this method will - /// also increment `pending_revision_increments`, thus signalling to queries - /// that their results are "cancelled" and they should abort as expeditiously - /// as possible. - /// - /// The `op` closure should actually perform the writes needed. It is given - /// the new revision as an argument, and its return value indicates whether - /// any pre-existing value was modified: - /// - /// - returning `None` means that no pre-existing value was modified (this - /// could occur e.g. when setting some key on an input that was never set - /// before) - /// - returning `Some(d)` indicates that a pre-existing value was modified - /// and it had the durability `d`. This will update the records for when - /// values with each durability were modified. - /// - /// Note that, given our writer model, we can assume that only one thread is - /// attempting to increment the global revision at a time. - pub(crate) fn with_incremented_revision<F>(&mut self, op: F) - where - F: FnOnce(Revision) -> Option<Durability>, - { - tracing::trace!("increment_revision()"); - - if !self.permits_increment() { - panic!("increment_revision invoked during a query computation"); - } - - // Set the `pending_revision` field so that people - // know current revision is cancelled. - let current_revision = self.shared_state.pending_revision.fetch_then_increment(); - - // To modify the revision, we need the lock. - let shared_state = self.shared_state.clone(); - let _lock = shared_state.query_lock.write(); - - let old_revision = self.shared_state.revisions[0].fetch_then_increment(); - assert_eq!(current_revision, old_revision); - - let new_revision = current_revision.next(); - - trace!("increment_revision: incremented to {:?}", new_revision); - - if let Some(d) = op(new_revision) { - for rev in &self.shared_state.revisions[1..=d.index()] { - rev.store(new_revision); - } - } - } - - pub(crate) fn permits_increment(&self) -> bool { - self.revision_guard.is_none() && !self.local_state.query_in_progress() - } - - #[inline] - pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> { - self.local_state.push_query(database_key_index) - } - - /// Reports that the currently active query read the result from - /// another query. - /// - /// Also checks whether the "cycle participant" flag is set on - /// the current stack frame -- if so, panics with `CycleParticipant` - /// value, which should be caught by the code executing the query. - /// - /// # Parameters - /// - /// - `database_key`: the query whose result was read - /// - `changed_revision`: the last revision in which the result of that - /// query had changed - pub(crate) fn report_query_read_and_unwind_if_cycle_resulted( - &self, - input: DatabaseKeyIndex, - durability: Durability, - changed_at: Revision, - ) { - self.local_state - .report_query_read_and_unwind_if_cycle_resulted(input, durability, changed_at); - } - - /// Reports that the query depends on some state unknown to salsa. - /// - /// Queries which report untracked reads will be re-executed in the next - /// revision. - pub fn report_untracked_read(&self) { - self.local_state.report_untracked_read(self.current_revision()); - } - - /// Acts as though the current query had read an input with the given durability; this will force the current query's durability to be at most `durability`. - /// - /// This is mostly useful to control the durability level for [on-demand inputs](https://salsa-rs.github.io/salsa/common_patterns/on_demand_inputs.html). - pub fn report_synthetic_read(&self, durability: Durability) { - let changed_at = self.last_changed_revision(durability); - self.local_state.report_synthetic_read(durability, changed_at); - } - - /// Handles a cycle in the dependency graph that was detected when the - /// current thread tried to block on `database_key_index` which is being - /// executed by `to_id`. If this function returns, then `to_id` no longer - /// depends on the current thread, and so we should continue executing - /// as normal. Otherwise, the function will throw a `Cycle` which is expected - /// to be caught by some frame on our stack. This occurs either if there is - /// a frame on our stack with cycle recovery (possibly the top one!) or if there - /// is no cycle recovery at all. - fn unblock_cycle_and_maybe_throw( - &self, - db: &dyn Database, - dg: &mut DependencyGraph, - database_key_index: DatabaseKeyIndex, - to_id: RuntimeId, - ) { - trace!("unblock_cycle_and_maybe_throw(database_key={:?})", database_key_index); - - let mut from_stack = self.local_state.take_query_stack(); - let from_id = self.id(); - - // Make a "dummy stack frame". As we iterate through the cycle, we will collect the - // inputs from each participant. Then, if we are participating in cycle recovery, we - // will propagate those results to all participants. - let mut cycle_query = ActiveQuery::new(database_key_index); - - // Identify the cycle participants: - let cycle = { - let mut v = vec![]; - dg.for_each_cycle_participant( - from_id, - &mut from_stack, - database_key_index, - to_id, - |aqs| { - aqs.iter_mut().for_each(|aq| { - cycle_query.add_from(aq); - v.push(aq.database_key_index); - }); - }, - ); - - // We want to give the participants in a deterministic order - // (at least for this execution, not necessarily across executions), - // no matter where it started on the stack. Find the minimum - // key and rotate it to the front. - let index = v.iter().position_min().unwrap_or_default(); - v.rotate_left(index); - - // No need to store extra memory. - v.shrink_to_fit(); - - Cycle::new(Arc::new(v)) - }; - trace!("cycle {:?}, cycle_query {:#?}", cycle.debug(db), cycle_query,); - - // We can remove the cycle participants from the list of dependencies; - // they are a strongly connected component (SCC) and we only care about - // dependencies to things outside the SCC that control whether it will - // form again. - cycle_query.remove_cycle_participants(&cycle); - - // Mark each cycle participant that has recovery set, along with - // any frames that come after them on the same thread. Those frames - // are going to be unwound so that fallback can occur. - dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| { - aqs.iter_mut() - .skip_while(|aq| match db.cycle_recovery_strategy(aq.database_key_index) { - CycleRecoveryStrategy::Panic => true, - CycleRecoveryStrategy::Fallback => false, - }) - .for_each(|aq| { - trace!("marking {:?} for fallback", aq.database_key_index.debug(db)); - aq.take_inputs_from(&cycle_query); - assert!(aq.cycle.is_none()); - aq.cycle = Some(cycle.clone()); - }); - }); - - // Unblock every thread that has cycle recovery with a `WaitResult::Cycle`. - // They will throw the cycle, which will be caught by the frame that has - // cycle recovery so that it can execute that recovery. - let (me_recovered, others_recovered) = - dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id); - - self.local_state.restore_query_stack(from_stack); - - if me_recovered { - // If the current thread has recovery, we want to throw - // so that it can begin. - cycle.throw() - } else if others_recovered { - // If other threads have recovery but we didn't: return and we will block on them. - } else { - // if nobody has recover, then we panic - panic_any(cycle); - } - } - - /// Block until `other_id` completes executing `database_key`; - /// panic or unwind in the case of a cycle. - /// - /// `query_mutex_guard` is the guard for the current query's state; - /// it will be dropped after we have successfully registered the - /// dependency. - /// - /// # Propagating panics - /// - /// If the thread `other_id` panics, then our thread is considered - /// cancelled, so this function will panic with a `Cancelled` value. - /// - /// # Cycle handling - /// - /// If the thread `other_id` already depends on the current thread, - /// and hence there is a cycle in the query graph, then this function - /// will unwind instead of returning normally. The method of unwinding - /// depends on the [`Self::mutual_cycle_recovery_strategy`] - /// of the cycle participants: - /// - /// * [`CycleRecoveryStrategy::Panic`]: panic with the [`Cycle`] as the value. - /// * [`CycleRecoveryStrategy::Fallback`]: initiate unwinding with [`CycleParticipant::unwind`]. - pub(crate) fn block_on_or_unwind<QueryMutexGuard>( - &self, - db: &dyn Database, - database_key: DatabaseKeyIndex, - other_id: RuntimeId, - query_mutex_guard: QueryMutexGuard, - ) { - let mut dg = self.shared_state.dependency_graph.lock(); - - if dg.depends_on(other_id, self.id()) { - self.unblock_cycle_and_maybe_throw(db, &mut dg, database_key, other_id); - - // If the above fn returns, then (via cycle recovery) it has unblocked the - // cycle, so we can continue. - assert!(!dg.depends_on(other_id, self.id())); - } - - db.salsa_event(Event { - runtime_id: self.id(), - kind: EventKind::WillBlockOn { other_runtime_id: other_id, database_key }, - }); - - let stack = self.local_state.take_query_stack(); - - let (stack, result) = DependencyGraph::block_on( - dg, - self.id(), - database_key, - other_id, - stack, - query_mutex_guard, - ); - - self.local_state.restore_query_stack(stack); - - match result { - WaitResult::Completed => (), - - // If the other thread panicked, then we consider this thread - // cancelled. The assumption is that the panic will be detected - // by the other thread and responded to appropriately. - WaitResult::Panicked => Cancelled::PropagatedPanic.throw(), - - WaitResult::Cycle(c) => c.throw(), - } - } - - /// Invoked when this runtime completed computing `database_key` with - /// the given result `wait_result` (`wait_result` should be `None` if - /// computing `database_key` panicked and could not complete). - /// This function unblocks any dependent queries and allows them - /// to continue executing. - pub(crate) fn unblock_queries_blocked_on( - &self, - database_key: DatabaseKeyIndex, - wait_result: WaitResult, - ) { - self.shared_state - .dependency_graph - .lock() - .unblock_runtimes_blocked_on(database_key, wait_result); - } -} - -/// State that will be common to all threads (when we support multiple threads) -struct SharedState { - /// Stores the next id to use for a snapshotted runtime (starts at 1). - next_id: AtomicU32, - - /// Whenever derived queries are executing, they acquire this lock - /// in read mode. Mutating inputs (and thus creating a new - /// revision) requires a write lock (thus guaranteeing that no - /// derived queries are in progress). Note that this is not needed - /// to prevent **race conditions** -- the revision counter itself - /// is stored in an `AtomicUsize` so it can be cheaply read - /// without acquiring the lock. Rather, the `query_lock` is used - /// to ensure a higher-level consistency property. - query_lock: RwLock<()>, - - /// This is typically equal to `revision` -- set to `revision+1` - /// when a new revision is pending (which implies that the current - /// revision is cancelled). - pending_revision: AtomicRevision, - - /// Stores the "last change" revision for values of each Durability. - /// This vector is always of length at least 1 (for Durability 0) - /// but its total length depends on the number of Durabilities. The - /// element at index 0 is special as it represents the "current - /// revision". In general, we have the invariant that revisions - /// in here are *declining* -- that is, `revisions[i] >= - /// revisions[i + 1]`, for all `i`. This is because when you - /// modify a value with durability D, that implies that values - /// with durability less than D may have changed too. - revisions: [AtomicRevision; Durability::LEN], - - /// The dependency graph tracks which runtimes are blocked on one - /// another, waiting for queries to terminate. - dependency_graph: Mutex<DependencyGraph>, -} - -impl std::panic::RefUnwindSafe for SharedState {} - -impl Default for SharedState { - fn default() -> Self { - #[allow(clippy::declare_interior_mutable_const)] - const START: AtomicRevision = AtomicRevision::start(); - SharedState { - next_id: AtomicU32::new(1), - query_lock: Default::default(), - revisions: [START; Durability::LEN], - pending_revision: START, - dependency_graph: Default::default(), - } - } -} - -impl std::fmt::Debug for SharedState { - fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let query_lock = if self.query_lock.is_locked_exclusive() { - "<wlocked>" - } else if self.query_lock.is_locked() { - "<rlocked>" - } else { - "<unlocked>" - }; - fmt.debug_struct("SharedState") - .field("query_lock", &query_lock) - .field("revisions", &self.revisions) - .field("pending_revision", &self.pending_revision) - .finish() - } -} - -#[derive(Debug)] -struct ActiveQuery { - /// What query is executing - database_key_index: DatabaseKeyIndex, - - /// Minimum durability of inputs observed so far. - durability: Durability, - - /// Maximum revision of all inputs observed. If we observe an - /// untracked read, this will be set to the most recent revision. - changed_at: Revision, - - /// Set of subqueries that were accessed thus far, or `None` if - /// there was an untracked the read. - dependencies: Option<FxIndexSet<DatabaseKeyIndex>>, - - /// Stores the entire cycle, if one is found and this query is part of it. - cycle: Option<Cycle>, -} - -impl ActiveQuery { - fn new(database_key_index: DatabaseKeyIndex) -> Self { - ActiveQuery { - database_key_index, - durability: Durability::MAX, - changed_at: Revision::start(), - dependencies: Some(FxIndexSet::default()), - cycle: None, - } - } - - fn add_read(&mut self, input: DatabaseKeyIndex, durability: Durability, revision: Revision) { - if let Some(set) = &mut self.dependencies { - set.insert(input); - } - - self.durability = self.durability.min(durability); - self.changed_at = self.changed_at.max(revision); - } - - fn add_untracked_read(&mut self, changed_at: Revision) { - self.dependencies = None; - self.durability = Durability::LOW; - self.changed_at = changed_at; - } - - fn add_synthetic_read(&mut self, durability: Durability, revision: Revision) { - self.dependencies = None; - self.durability = self.durability.min(durability); - self.changed_at = self.changed_at.max(revision); - } - - pub(crate) fn revisions(&self) -> QueryRevisions { - let (inputs, untracked) = match &self.dependencies { - None => (None, true), - - Some(dependencies) => ( - if dependencies.is_empty() { - None - } else { - Some(ThinArc::from_header_and_iter((), dependencies.iter().copied())) - }, - false, - ), - }; - - QueryRevisions { - changed_at: self.changed_at, - inputs, - untracked, - durability: self.durability, - } - } - - /// Adds any dependencies from `other` into `self`. - /// Used during cycle recovery, see [`Runtime::create_cycle_error`]. - fn add_from(&mut self, other: &ActiveQuery) { - self.changed_at = self.changed_at.max(other.changed_at); - self.durability = self.durability.min(other.durability); - if let Some(other_dependencies) = &other.dependencies { - if let Some(my_dependencies) = &mut self.dependencies { - my_dependencies.extend(other_dependencies.iter().copied()); - } - } else { - self.dependencies = None; - } - } - - /// Removes the participants in `cycle` from my dependencies. - /// Used during cycle recovery, see [`Runtime::create_cycle_error`]. - fn remove_cycle_participants(&mut self, cycle: &Cycle) { - if let Some(my_dependencies) = &mut self.dependencies { - for p in cycle.participant_keys() { - my_dependencies.swap_remove(&p); - } - } - } - - /// Copy the changed-at, durability, and dependencies from `cycle_query`. - /// Used during cycle recovery, see [`Runtime::create_cycle_error`]. - pub(crate) fn take_inputs_from(&mut self, cycle_query: &ActiveQuery) { - self.changed_at = cycle_query.changed_at; - self.durability = cycle_query.durability; - self.dependencies.clone_from(&cycle_query.dependencies); - } -} - -/// A unique identifier for a particular runtime. Each time you create -/// a snapshot, a fresh `RuntimeId` is generated. Once a snapshot is -/// complete, its `RuntimeId` may potentially be re-used. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct RuntimeId { - counter: u32, -} - -#[derive(Clone, Debug)] -pub(crate) struct StampedValue<V> { - pub(crate) value: V, - pub(crate) durability: Durability, - pub(crate) changed_at: Revision, -} - -struct RevisionGuard { - shared_state: Arc<SharedState>, -} - -impl RevisionGuard { - fn new(shared_state: &Arc<SharedState>) -> Self { - // Subtle: we use a "recursive" lock here so that it is not an - // error to acquire a read-lock when one is already held (this - // happens when a query uses `snapshot` to spawn off parallel - // workers, for example). - // - // This has the side-effect that we are responsible to ensure - // that people contending for the write lock do not starve, - // but this is what we achieve via the cancellation mechanism. - // - // (In particular, since we only ever have one "mutating - // handle" to the database, the only contention for the global - // query lock occurs when there are "futures" evaluating - // queries in parallel, and those futures hold a read-lock - // already, so the starvation problem is more about them bring - // themselves to a close, versus preventing other people from - // *starting* work). - unsafe { - shared_state.query_lock.raw().lock_shared_recursive(); - } - - Self { shared_state: shared_state.clone() } - } -} - -impl Drop for RevisionGuard { - fn drop(&mut self) { - // Release our read-lock without using RAII. As documented in - // `Snapshot::new` above, this requires the unsafe keyword. - unsafe { - self.shared_state.query_lock.raw().unlock_shared(); - } - } -} |