Diffstat (limited to 'src/effect/async/then.rs')
| -rw-r--r-- | src/effect/async/then.rs | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/src/effect/async/then.rs b/src/effect/async/then.rs new file mode 100644 index 0000000..96b2c8c --- /dev/null +++ b/src/effect/async/then.rs @@ -0,0 +1,187 @@ +use core::{ + ops::ControlFlow, + pin::Pin, + task::{Context, Poll}, +}; + +use futures::Future; +use pin_project::pin_project; + +use crate::effect::{Effective, ErasedEffective}; + +use super::{map::Map, Async, BoxOrReady}; + +#[pin_project(project = ThenStateProj, project_replace = ThenStateReplace)] +enum ThenState<'a, Fut, F, V> +where + V: Effective<'a>, +{ + Incomplete { + #[pin] + future: Fut, + f: F, + }, + Completed { + #[pin] + effective: V::IntoFuture, + }, + Temp, +} + +#[pin_project] +pub struct Then<'a, Fut, F, V> +where + V: Effective<'a>, +{ + #[pin] + state: ThenState<'a, Fut, F, V>, +} + +impl<'a, Fut, F, V> Then<'a, Fut, F, V> +where + V: Effective<'a>, +{ + pub(super) fn new(future: Fut, f: F) -> Self { + Self { + state: ThenState::Incomplete { future, f }, + } + } +} + +impl<'a, Fut, F, V> Future for Then<'a, Fut, F, V> +where + Fut: Future, + F: FnOnce(Fut::Output) -> V, + V: Effective<'a>, +{ + type Output = V::Output; + + #[inline(always)] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + match this.state.as_mut().project() { + ThenStateProj::Incomplete { future, .. } => { + let output = match future.poll(cx) { + Poll::Ready(value) => value, + Poll::Pending => return Poll::Pending, + }; + + match this.state.as_mut().project_replace(ThenState::Temp) { + ThenStateReplace::Incomplete { f, .. } => { + let effective = f(output).into_future(); + + this.state + .as_mut() + .project_replace(ThenState::Completed { effective }); + + match this.state.project() { + ThenStateProj::Completed { effective } => effective.poll(cx), + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } + ThenStateProj::Completed { effective } => effective.poll(cx), + ThenStateProj::Temp => unreachable!(), + } + } +} + +impl<'c, 'lt: 'c, Fut0: Send + Sync + 'lt, V0: Send + Sync + 'lt, F0: Send + Sync + 'c> + Effective<'c> for Then<'lt, Fut0, F0, V0> +where + F0: FnOnce(Fut0::Output) -> V0, + V0: Effective<'lt>, + Fut0: Future, + Fut0::Output: Send + Sync + 'lt, +{ + fn into_erased<B>(self) -> ErasedEffective<'c, Self::Output, Self::Effect, B> { + BoxOrReady::Boxed(Box::pin(self)) + } + + type Effect = Async; + + type Output = V0::Output; + + type IntoFuture = Self; + + fn into_future(self) -> Self::IntoFuture { + self + } + + type Loop<'ctx: 'a, 'a, T: Send + Sync + 'a, F: Send + Sync + 'a> + = BoxOrReady<'a, (V0::Output, T)> + where + F: for<'b> FnMut(&'b mut Self::Output) -> ErasedEffective<'b, ControlFlow<T>, Self::Effect>, + 'c: 'a; + + fn r#loop<'ctx: 'a, 'a, T: Send + Sync + 'a, F: Send + Sync + 'a>(self, mut cb: F) -> Self::Loop<'ctx, 'a, T, F> + where + F: for<'b> FnMut(&'b mut Self::Output) -> ErasedEffective<'b, ControlFlow<T>, Self::Effect>, + 'c: 'a, + { + BoxOrReady::Boxed(Box::pin(async move { + let mut this = self.into_future().await; + + loop { + if let ControlFlow::Break(value) = cb(&mut this).into_future().await { + return (this, value); + } + } + })) + } + + type Map<'a, T: Send + Sync + 'a, F: Send + Sync + 'a> + = Map<Self, F> + where + F: FnOnce(Self::Output) -> T, + 'c: 'a; + + fn map<'a, T: Send + Sync + 'a, F: Send + Sync + 'a>(self, cb: F) -> Self::Map<'a, T, F> + where + F: FnOnce(Self::Output) -> T, + 'c: 'a, + { + Map::new(self, cb) + } + + type Then<'a, T: Send + Sync + 'a, V: Send + Sync + 'a, F: Send + Sync + 'a> + = Then<'a, Self, F, V> + where + F: FnOnce(Self::Output) -> V, + V: Effective<'a, Output = T, Effect = Self::Effect>, + 'c: 'a; + + fn then<'a, T: Send + Sync + 'a, V: Send + Sync + 'a, F: Send + Sync + 'a>( + self, + cb: F, + ) -> Self::Then<'a, T, V, F> + where + F: FnOnce(Self::Output) -> V, + V: Effective<'a, Output = T, Effect = Self::Effect>, + 'c: 'a, + { + Then::new(self, cb) + } + + type AsCtx<'ctx: 'a, 'a, T: Send + Sync + 'a, F: Send + Sync + 'a> + = BoxOrReady<'a, (Self::Output, T)> + where + F: for<'b> FnOnce(&'b mut Self::Output) -> ErasedEffective<'b, T, Self::Effect>, + 'c: 'a; + + fn as_ctx<'ctx: 'a, 'a, T: Send + Sync + 'a, F: Send + Sync + 'a>(self, cb: F) -> Self::AsCtx<'ctx, 'a, T, F> + where + F: for<'b> FnOnce(&'b mut Self::Output) -> ErasedEffective<'b, T, Self::Effect>, + 'c: 'a, + { + BoxOrReady::Boxed(Box::pin(async { + let mut this = self.await; + + let result = cb(&mut this).into_future().await; + + (this, result) + })) + } +} |