Unnamed repository; edit this file 'description' to name the repository.
Diffstat (limited to 'crates/ra-salsa/src/runtime.rs')
-rw-r--r--crates/ra-salsa/src/runtime.rs668
1 files changed, 0 insertions, 668 deletions
diff --git a/crates/ra-salsa/src/runtime.rs b/crates/ra-salsa/src/runtime.rs
deleted file mode 100644
index cb16ba0044..0000000000
--- a/crates/ra-salsa/src/runtime.rs
+++ /dev/null
@@ -1,668 +0,0 @@
-use crate::durability::Durability;
-use crate::hash::FxIndexSet;
-use crate::plumbing::CycleRecoveryStrategy;
-use crate::revision::{AtomicRevision, Revision};
-use crate::{Cancelled, Cycle, Database, DatabaseKeyIndex, Event, EventKind};
-use itertools::Itertools;
-use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
-use parking_lot::{Mutex, RwLock};
-use std::hash::Hash;
-use std::panic::panic_any;
-use std::sync::atomic::{AtomicU32, Ordering};
-use tracing::trace;
-use triomphe::{Arc, ThinArc};
-
-mod dependency_graph;
-use dependency_graph::DependencyGraph;
-
-pub(crate) mod local_state;
-use local_state::LocalState;
-
-use self::local_state::{ActiveQueryGuard, QueryRevisions};
-
-/// The salsa runtime stores the storage for all queries as well as
-/// tracking the query stack and dependencies between cycles.
-///
-/// Each new runtime you create (e.g., via `Runtime::new` or
-/// `Runtime::default`) will have an independent set of query storage
-/// associated with it. Normally, therefore, you only do this once, at
-/// the start of your application.
-pub struct Runtime {
- /// Our unique runtime id.
- id: RuntimeId,
-
- /// If this is a "forked" runtime, then the `revision_guard` will
- /// be `Some`; this guard holds a read-lock on the global query
- /// lock.
- revision_guard: Option<RevisionGuard>,
-
- /// Local state that is specific to this runtime (thread).
- local_state: LocalState,
-
- /// Shared state that is accessible via all runtimes.
- shared_state: Arc<SharedState>,
-}
-
-#[derive(Clone, Debug)]
-pub(crate) enum WaitResult {
- Completed,
- Panicked,
- Cycle(Cycle),
-}
-
-impl Default for Runtime {
- fn default() -> Self {
- Runtime {
- id: RuntimeId { counter: 0 },
- revision_guard: None,
- shared_state: Default::default(),
- local_state: Default::default(),
- }
- }
-}
-
-impl std::fmt::Debug for Runtime {
- fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- fmt.debug_struct("Runtime")
- .field("id", &self.id())
- .field("forked", &self.revision_guard.is_some())
- .field("shared_state", &self.shared_state)
- .finish()
- }
-}
-
-impl Runtime {
- /// Create a new runtime; equivalent to `Self::default`. This is
- /// used when creating a new database.
- pub fn new() -> Self {
- Self::default()
- }
-
- /// See [`crate::storage::Storage::snapshot`].
- pub(crate) fn snapshot(&self) -> Self {
- if self.local_state.query_in_progress() {
- panic!("it is not legal to `snapshot` during a query (see salsa-rs/salsa#80)");
- }
-
- let revision_guard = RevisionGuard::new(&self.shared_state);
-
- let id = RuntimeId { counter: self.shared_state.next_id.fetch_add(1, Ordering::SeqCst) };
-
- Runtime {
- id,
- revision_guard: Some(revision_guard),
- shared_state: self.shared_state.clone(),
- local_state: Default::default(),
- }
- }
-
- /// A "synthetic write" causes the system to act *as though* some
- /// input of durability `durability` has changed. This is mostly
- /// useful for profiling scenarios.
- ///
- /// **WARNING:** Just like an ordinary write, this method triggers
- /// cancellation. If you invoke it while a snapshot exists, it
- /// will block until that snapshot is dropped -- if that snapshot
- /// is owned by the current thread, this could trigger deadlock.
- pub fn synthetic_write(&mut self, durability: Durability) {
- self.with_incremented_revision(|_next_revision| Some(durability));
- }
-
- /// The unique identifier attached to this `SalsaRuntime`. Each
- /// snapshotted runtime has a distinct identifier.
- #[inline]
- pub fn id(&self) -> RuntimeId {
- self.id
- }
-
- /// Returns the database-key for the query that this thread is
- /// actively executing (if any).
- pub fn active_query(&self) -> Option<DatabaseKeyIndex> {
- self.local_state.active_query()
- }
-
- /// Read current value of the revision counter.
- #[inline]
- pub(crate) fn current_revision(&self) -> Revision {
- self.shared_state.revisions[0].load()
- }
-
- /// The revision in which values with durability `d` may have last
- /// changed. For D0, this is just the current revision. But for
- /// higher levels of durability, this value may lag behind the
- /// current revision. If we encounter a value of durability Di,
- /// then, we can check this function to get a "bound" on when the
- /// value may have changed, which allows us to skip walking its
- /// dependencies.
- #[inline]
- pub(crate) fn last_changed_revision(&self, d: Durability) -> Revision {
- self.shared_state.revisions[d.index()].load()
- }
-
- /// Read current value of the revision counter.
- #[inline]
- pub(crate) fn pending_revision(&self) -> Revision {
- self.shared_state.pending_revision.load()
- }
-
- #[cold]
- pub(crate) fn unwind_cancelled(&self) {
- self.report_untracked_read();
- Cancelled::PendingWrite.throw();
- }
-
- /// Acquires the **global query write lock** (ensuring that no queries are
- /// executing) and then increments the current revision counter; invokes
- /// `op` with the global query write lock still held.
- ///
- /// While we wait to acquire the global query write lock, this method will
- /// also increment `pending_revision_increments`, thus signalling to queries
- /// that their results are "cancelled" and they should abort as expeditiously
- /// as possible.
- ///
- /// The `op` closure should actually perform the writes needed. It is given
- /// the new revision as an argument, and its return value indicates whether
- /// any pre-existing value was modified:
- ///
- /// - returning `None` means that no pre-existing value was modified (this
- /// could occur e.g. when setting some key on an input that was never set
- /// before)
- /// - returning `Some(d)` indicates that a pre-existing value was modified
- /// and it had the durability `d`. This will update the records for when
- /// values with each durability were modified.
- ///
- /// Note that, given our writer model, we can assume that only one thread is
- /// attempting to increment the global revision at a time.
- pub(crate) fn with_incremented_revision<F>(&mut self, op: F)
- where
- F: FnOnce(Revision) -> Option<Durability>,
- {
- tracing::trace!("increment_revision()");
-
- if !self.permits_increment() {
- panic!("increment_revision invoked during a query computation");
- }
-
- // Set the `pending_revision` field so that people
- // know current revision is cancelled.
- let current_revision = self.shared_state.pending_revision.fetch_then_increment();
-
- // To modify the revision, we need the lock.
- let shared_state = self.shared_state.clone();
- let _lock = shared_state.query_lock.write();
-
- let old_revision = self.shared_state.revisions[0].fetch_then_increment();
- assert_eq!(current_revision, old_revision);
-
- let new_revision = current_revision.next();
-
- trace!("increment_revision: incremented to {:?}", new_revision);
-
- if let Some(d) = op(new_revision) {
- for rev in &self.shared_state.revisions[1..=d.index()] {
- rev.store(new_revision);
- }
- }
- }
-
- pub(crate) fn permits_increment(&self) -> bool {
- self.revision_guard.is_none() && !self.local_state.query_in_progress()
- }
-
- #[inline]
- pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
- self.local_state.push_query(database_key_index)
- }
-
- /// Reports that the currently active query read the result from
- /// another query.
- ///
- /// Also checks whether the "cycle participant" flag is set on
- /// the current stack frame -- if so, panics with `CycleParticipant`
- /// value, which should be caught by the code executing the query.
- ///
- /// # Parameters
- ///
- /// - `database_key`: the query whose result was read
- /// - `changed_revision`: the last revision in which the result of that
- /// query had changed
- pub(crate) fn report_query_read_and_unwind_if_cycle_resulted(
- &self,
- input: DatabaseKeyIndex,
- durability: Durability,
- changed_at: Revision,
- ) {
- self.local_state
- .report_query_read_and_unwind_if_cycle_resulted(input, durability, changed_at);
- }
-
- /// Reports that the query depends on some state unknown to salsa.
- ///
- /// Queries which report untracked reads will be re-executed in the next
- /// revision.
- pub fn report_untracked_read(&self) {
- self.local_state.report_untracked_read(self.current_revision());
- }
-
- /// Acts as though the current query had read an input with the given durability; this will force the current query's durability to be at most `durability`.
- ///
- /// This is mostly useful to control the durability level for [on-demand inputs](https://salsa-rs.github.io/salsa/common_patterns/on_demand_inputs.html).
- pub fn report_synthetic_read(&self, durability: Durability) {
- let changed_at = self.last_changed_revision(durability);
- self.local_state.report_synthetic_read(durability, changed_at);
- }
-
- /// Handles a cycle in the dependency graph that was detected when the
- /// current thread tried to block on `database_key_index` which is being
- /// executed by `to_id`. If this function returns, then `to_id` no longer
- /// depends on the current thread, and so we should continue executing
- /// as normal. Otherwise, the function will throw a `Cycle` which is expected
- /// to be caught by some frame on our stack. This occurs either if there is
- /// a frame on our stack with cycle recovery (possibly the top one!) or if there
- /// is no cycle recovery at all.
- fn unblock_cycle_and_maybe_throw(
- &self,
- db: &dyn Database,
- dg: &mut DependencyGraph,
- database_key_index: DatabaseKeyIndex,
- to_id: RuntimeId,
- ) {
- trace!("unblock_cycle_and_maybe_throw(database_key={:?})", database_key_index);
-
- let mut from_stack = self.local_state.take_query_stack();
- let from_id = self.id();
-
- // Make a "dummy stack frame". As we iterate through the cycle, we will collect the
- // inputs from each participant. Then, if we are participating in cycle recovery, we
- // will propagate those results to all participants.
- let mut cycle_query = ActiveQuery::new(database_key_index);
-
- // Identify the cycle participants:
- let cycle = {
- let mut v = vec![];
- dg.for_each_cycle_participant(
- from_id,
- &mut from_stack,
- database_key_index,
- to_id,
- |aqs| {
- aqs.iter_mut().for_each(|aq| {
- cycle_query.add_from(aq);
- v.push(aq.database_key_index);
- });
- },
- );
-
- // We want to give the participants in a deterministic order
- // (at least for this execution, not necessarily across executions),
- // no matter where it started on the stack. Find the minimum
- // key and rotate it to the front.
- let index = v.iter().position_min().unwrap_or_default();
- v.rotate_left(index);
-
- // No need to store extra memory.
- v.shrink_to_fit();
-
- Cycle::new(Arc::new(v))
- };
- trace!("cycle {:?}, cycle_query {:#?}", cycle.debug(db), cycle_query,);
-
- // We can remove the cycle participants from the list of dependencies;
- // they are a strongly connected component (SCC) and we only care about
- // dependencies to things outside the SCC that control whether it will
- // form again.
- cycle_query.remove_cycle_participants(&cycle);
-
- // Mark each cycle participant that has recovery set, along with
- // any frames that come after them on the same thread. Those frames
- // are going to be unwound so that fallback can occur.
- dg.for_each_cycle_participant(from_id, &mut from_stack, database_key_index, to_id, |aqs| {
- aqs.iter_mut()
- .skip_while(|aq| match db.cycle_recovery_strategy(aq.database_key_index) {
- CycleRecoveryStrategy::Panic => true,
- CycleRecoveryStrategy::Fallback => false,
- })
- .for_each(|aq| {
- trace!("marking {:?} for fallback", aq.database_key_index.debug(db));
- aq.take_inputs_from(&cycle_query);
- assert!(aq.cycle.is_none());
- aq.cycle = Some(cycle.clone());
- });
- });
-
- // Unblock every thread that has cycle recovery with a `WaitResult::Cycle`.
- // They will throw the cycle, which will be caught by the frame that has
- // cycle recovery so that it can execute that recovery.
- let (me_recovered, others_recovered) =
- dg.maybe_unblock_runtimes_in_cycle(from_id, &from_stack, database_key_index, to_id);
-
- self.local_state.restore_query_stack(from_stack);
-
- if me_recovered {
- // If the current thread has recovery, we want to throw
- // so that it can begin.
- cycle.throw()
- } else if others_recovered {
- // If other threads have recovery but we didn't: return and we will block on them.
- } else {
- // if nobody has recover, then we panic
- panic_any(cycle);
- }
- }
-
- /// Block until `other_id` completes executing `database_key`;
- /// panic or unwind in the case of a cycle.
- ///
- /// `query_mutex_guard` is the guard for the current query's state;
- /// it will be dropped after we have successfully registered the
- /// dependency.
- ///
- /// # Propagating panics
- ///
- /// If the thread `other_id` panics, then our thread is considered
- /// cancelled, so this function will panic with a `Cancelled` value.
- ///
- /// # Cycle handling
- ///
- /// If the thread `other_id` already depends on the current thread,
- /// and hence there is a cycle in the query graph, then this function
- /// will unwind instead of returning normally. The method of unwinding
- /// depends on the [`Self::mutual_cycle_recovery_strategy`]
- /// of the cycle participants:
- ///
- /// * [`CycleRecoveryStrategy::Panic`]: panic with the [`Cycle`] as the value.
- /// * [`CycleRecoveryStrategy::Fallback`]: initiate unwinding with [`CycleParticipant::unwind`].
- pub(crate) fn block_on_or_unwind<QueryMutexGuard>(
- &self,
- db: &dyn Database,
- database_key: DatabaseKeyIndex,
- other_id: RuntimeId,
- query_mutex_guard: QueryMutexGuard,
- ) {
- let mut dg = self.shared_state.dependency_graph.lock();
-
- if dg.depends_on(other_id, self.id()) {
- self.unblock_cycle_and_maybe_throw(db, &mut dg, database_key, other_id);
-
- // If the above fn returns, then (via cycle recovery) it has unblocked the
- // cycle, so we can continue.
- assert!(!dg.depends_on(other_id, self.id()));
- }
-
- db.salsa_event(Event {
- runtime_id: self.id(),
- kind: EventKind::WillBlockOn { other_runtime_id: other_id, database_key },
- });
-
- let stack = self.local_state.take_query_stack();
-
- let (stack, result) = DependencyGraph::block_on(
- dg,
- self.id(),
- database_key,
- other_id,
- stack,
- query_mutex_guard,
- );
-
- self.local_state.restore_query_stack(stack);
-
- match result {
- WaitResult::Completed => (),
-
- // If the other thread panicked, then we consider this thread
- // cancelled. The assumption is that the panic will be detected
- // by the other thread and responded to appropriately.
- WaitResult::Panicked => Cancelled::PropagatedPanic.throw(),
-
- WaitResult::Cycle(c) => c.throw(),
- }
- }
-
- /// Invoked when this runtime completed computing `database_key` with
- /// the given result `wait_result` (`wait_result` should be `None` if
- /// computing `database_key` panicked and could not complete).
- /// This function unblocks any dependent queries and allows them
- /// to continue executing.
- pub(crate) fn unblock_queries_blocked_on(
- &self,
- database_key: DatabaseKeyIndex,
- wait_result: WaitResult,
- ) {
- self.shared_state
- .dependency_graph
- .lock()
- .unblock_runtimes_blocked_on(database_key, wait_result);
- }
-}
-
-/// State that will be common to all threads (when we support multiple threads)
-struct SharedState {
- /// Stores the next id to use for a snapshotted runtime (starts at 1).
- next_id: AtomicU32,
-
- /// Whenever derived queries are executing, they acquire this lock
- /// in read mode. Mutating inputs (and thus creating a new
- /// revision) requires a write lock (thus guaranteeing that no
- /// derived queries are in progress). Note that this is not needed
- /// to prevent **race conditions** -- the revision counter itself
- /// is stored in an `AtomicUsize` so it can be cheaply read
- /// without acquiring the lock. Rather, the `query_lock` is used
- /// to ensure a higher-level consistency property.
- query_lock: RwLock<()>,
-
- /// This is typically equal to `revision` -- set to `revision+1`
- /// when a new revision is pending (which implies that the current
- /// revision is cancelled).
- pending_revision: AtomicRevision,
-
- /// Stores the "last change" revision for values of each Durability.
- /// This vector is always of length at least 1 (for Durability 0)
- /// but its total length depends on the number of Durabilities. The
- /// element at index 0 is special as it represents the "current
- /// revision". In general, we have the invariant that revisions
- /// in here are *declining* -- that is, `revisions[i] >=
- /// revisions[i + 1]`, for all `i`. This is because when you
- /// modify a value with durability D, that implies that values
- /// with durability less than D may have changed too.
- revisions: [AtomicRevision; Durability::LEN],
-
- /// The dependency graph tracks which runtimes are blocked on one
- /// another, waiting for queries to terminate.
- dependency_graph: Mutex<DependencyGraph>,
-}
-
-impl std::panic::RefUnwindSafe for SharedState {}
-
-impl Default for SharedState {
- fn default() -> Self {
- #[allow(clippy::declare_interior_mutable_const)]
- const START: AtomicRevision = AtomicRevision::start();
- SharedState {
- next_id: AtomicU32::new(1),
- query_lock: Default::default(),
- revisions: [START; Durability::LEN],
- pending_revision: START,
- dependency_graph: Default::default(),
- }
- }
-}
-
-impl std::fmt::Debug for SharedState {
- fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let query_lock = if self.query_lock.is_locked_exclusive() {
- "<wlocked>"
- } else if self.query_lock.is_locked() {
- "<rlocked>"
- } else {
- "<unlocked>"
- };
- fmt.debug_struct("SharedState")
- .field("query_lock", &query_lock)
- .field("revisions", &self.revisions)
- .field("pending_revision", &self.pending_revision)
- .finish()
- }
-}
-
-#[derive(Debug)]
-struct ActiveQuery {
- /// What query is executing
- database_key_index: DatabaseKeyIndex,
-
- /// Minimum durability of inputs observed so far.
- durability: Durability,
-
- /// Maximum revision of all inputs observed. If we observe an
- /// untracked read, this will be set to the most recent revision.
- changed_at: Revision,
-
- /// Set of subqueries that were accessed thus far, or `None` if
- /// there was an untracked the read.
- dependencies: Option<FxIndexSet<DatabaseKeyIndex>>,
-
- /// Stores the entire cycle, if one is found and this query is part of it.
- cycle: Option<Cycle>,
-}
-
-impl ActiveQuery {
- fn new(database_key_index: DatabaseKeyIndex) -> Self {
- ActiveQuery {
- database_key_index,
- durability: Durability::MAX,
- changed_at: Revision::start(),
- dependencies: Some(FxIndexSet::default()),
- cycle: None,
- }
- }
-
- fn add_read(&mut self, input: DatabaseKeyIndex, durability: Durability, revision: Revision) {
- if let Some(set) = &mut self.dependencies {
- set.insert(input);
- }
-
- self.durability = self.durability.min(durability);
- self.changed_at = self.changed_at.max(revision);
- }
-
- fn add_untracked_read(&mut self, changed_at: Revision) {
- self.dependencies = None;
- self.durability = Durability::LOW;
- self.changed_at = changed_at;
- }
-
- fn add_synthetic_read(&mut self, durability: Durability, revision: Revision) {
- self.dependencies = None;
- self.durability = self.durability.min(durability);
- self.changed_at = self.changed_at.max(revision);
- }
-
- pub(crate) fn revisions(&self) -> QueryRevisions {
- let (inputs, untracked) = match &self.dependencies {
- None => (None, true),
-
- Some(dependencies) => (
- if dependencies.is_empty() {
- None
- } else {
- Some(ThinArc::from_header_and_iter((), dependencies.iter().copied()))
- },
- false,
- ),
- };
-
- QueryRevisions {
- changed_at: self.changed_at,
- inputs,
- untracked,
- durability: self.durability,
- }
- }
-
- /// Adds any dependencies from `other` into `self`.
- /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
- fn add_from(&mut self, other: &ActiveQuery) {
- self.changed_at = self.changed_at.max(other.changed_at);
- self.durability = self.durability.min(other.durability);
- if let Some(other_dependencies) = &other.dependencies {
- if let Some(my_dependencies) = &mut self.dependencies {
- my_dependencies.extend(other_dependencies.iter().copied());
- }
- } else {
- self.dependencies = None;
- }
- }
-
- /// Removes the participants in `cycle` from my dependencies.
- /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
- fn remove_cycle_participants(&mut self, cycle: &Cycle) {
- if let Some(my_dependencies) = &mut self.dependencies {
- for p in cycle.participant_keys() {
- my_dependencies.swap_remove(&p);
- }
- }
- }
-
- /// Copy the changed-at, durability, and dependencies from `cycle_query`.
- /// Used during cycle recovery, see [`Runtime::create_cycle_error`].
- pub(crate) fn take_inputs_from(&mut self, cycle_query: &ActiveQuery) {
- self.changed_at = cycle_query.changed_at;
- self.durability = cycle_query.durability;
- self.dependencies.clone_from(&cycle_query.dependencies);
- }
-}
-
-/// A unique identifier for a particular runtime. Each time you create
-/// a snapshot, a fresh `RuntimeId` is generated. Once a snapshot is
-/// complete, its `RuntimeId` may potentially be re-used.
-#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
-pub struct RuntimeId {
- counter: u32,
-}
-
-#[derive(Clone, Debug)]
-pub(crate) struct StampedValue<V> {
- pub(crate) value: V,
- pub(crate) durability: Durability,
- pub(crate) changed_at: Revision,
-}
-
-struct RevisionGuard {
- shared_state: Arc<SharedState>,
-}
-
-impl RevisionGuard {
- fn new(shared_state: &Arc<SharedState>) -> Self {
- // Subtle: we use a "recursive" lock here so that it is not an
- // error to acquire a read-lock when one is already held (this
- // happens when a query uses `snapshot` to spawn off parallel
- // workers, for example).
- //
- // This has the side-effect that we are responsible to ensure
- // that people contending for the write lock do not starve,
- // but this is what we achieve via the cancellation mechanism.
- //
- // (In particular, since we only ever have one "mutating
- // handle" to the database, the only contention for the global
- // query lock occurs when there are "futures" evaluating
- // queries in parallel, and those futures hold a read-lock
- // already, so the starvation problem is more about them bring
- // themselves to a close, versus preventing other people from
- // *starting* work).
- unsafe {
- shared_state.query_lock.raw().lock_shared_recursive();
- }
-
- Self { shared_state: shared_state.clone() }
- }
-}
-
-impl Drop for RevisionGuard {
- fn drop(&mut self) {
- // Release our read-lock without using RAII. As documented in
- // `Snapshot::new` above, this requires the unsafe keyword.
- unsafe {
- self.shared_state.query_lock.raw().unlock_shared();
- }
- }
-}