Diffstat (limited to 'src/effect/async/then.rs')
-rw-r--r--src/effect/async/then.rs187
1 files changed, 187 insertions, 0 deletions
diff --git a/src/effect/async/then.rs b/src/effect/async/then.rs
new file mode 100644
index 0000000..96b2c8c
--- /dev/null
+++ b/src/effect/async/then.rs
@@ -0,0 +1,187 @@
+use core::{
+ ops::ControlFlow,
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+use futures::Future;
+use pin_project::pin_project;
+
+use crate::effect::{Effective, ErasedEffective};
+
+use super::{map::Map, Async, BoxOrReady};
+
+#[pin_project(project = ThenStateProj, project_replace = ThenStateReplace)]
+enum ThenState<'a, Fut, F, V>
+where
+ V: Effective<'a>,
+{
+ Incomplete {
+ #[pin]
+ future: Fut,
+ f: F,
+ },
+ Completed {
+ #[pin]
+ effective: V::IntoFuture,
+ },
+ Temp,
+}
+
+#[pin_project]
+pub struct Then<'a, Fut, F, V>
+where
+ V: Effective<'a>,
+{
+ #[pin]
+ state: ThenState<'a, Fut, F, V>,
+}
+
+impl<'a, Fut, F, V> Then<'a, Fut, F, V>
+where
+ V: Effective<'a>,
+{
+ pub(super) fn new(future: Fut, f: F) -> Self {
+ Self {
+ state: ThenState::Incomplete { future, f },
+ }
+ }
+}
+
+impl<'a, Fut, F, V> Future for Then<'a, Fut, F, V>
+where
+ Fut: Future,
+ F: FnOnce(Fut::Output) -> V,
+ V: Effective<'a>,
+{
+ type Output = V::Output;
+
+ #[inline(always)]
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ match this.state.as_mut().project() {
+ ThenStateProj::Incomplete { future, .. } => {
+ let output = match future.poll(cx) {
+ Poll::Ready(value) => value,
+ Poll::Pending => return Poll::Pending,
+ };
+
+ match this.state.as_mut().project_replace(ThenState::Temp) {
+ ThenStateReplace::Incomplete { f, .. } => {
+ let effective = f(output).into_future();
+
+ this.state
+ .as_mut()
+ .project_replace(ThenState::Completed { effective });
+
+ match this.state.project() {
+ ThenStateProj::Completed { effective } => effective.poll(cx),
+ _ => unreachable!(),
+ }
+ }
+ _ => unreachable!(),
+ }
+ }
+ ThenStateProj::Completed { effective } => effective.poll(cx),
+ ThenStateProj::Temp => unreachable!(),
+ }
+ }
+}
+
+impl<'c, 'lt: 'c, Fut0: Send + Sync + 'lt, V0: Send + Sync + 'lt, F0: Send + Sync + 'c>
+ Effective<'c> for Then<'lt, Fut0, F0, V0>
+where
+ F0: FnOnce(Fut0::Output) -> V0,
+ V0: Effective<'lt>,
+ Fut0: Future,
+ Fut0::Output: Send + Sync + 'lt,
+{
+ fn into_erased<B>(self) -> ErasedEffective<'c, Self::Output, Self::Effect, B> {
+ BoxOrReady::Boxed(Box::pin(self))
+ }
+
+ type Effect = Async;
+
+ type Output = V0::Output;
+
+ type IntoFuture = Self;
+
+ fn into_future(self) -> Self::IntoFuture {
+ self
+ }
+
+ type Loop<'ctx: 'a, 'a, T: Send + Sync + 'a, F: Send + Sync + 'a>
+ = BoxOrReady<'a, (V0::Output, T)>
+ where
+ F: for<'b> FnMut(&'b mut Self::Output) -> ErasedEffective<'b, ControlFlow<T>, Self::Effect>,
+ 'c: 'a;
+
+ fn r#loop<'ctx: 'a, 'a, T: Send + Sync + 'a, F: Send + Sync + 'a>(self, mut cb: F) -> Self::Loop<'ctx, 'a, T, F>
+ where
+ F: for<'b> FnMut(&'b mut Self::Output) -> ErasedEffective<'b, ControlFlow<T>, Self::Effect>,
+ 'c: 'a,
+ {
+ BoxOrReady::Boxed(Box::pin(async move {
+ let mut this = self.into_future().await;
+
+ loop {
+ if let ControlFlow::Break(value) = cb(&mut this).into_future().await {
+ return (this, value);
+ }
+ }
+ }))
+ }
+
+ type Map<'a, T: Send + Sync + 'a, F: Send + Sync + 'a>
+ = Map<Self, F>
+ where
+ F: FnOnce(Self::Output) -> T,
+ 'c: 'a;
+
+ fn map<'a, T: Send + Sync + 'a, F: Send + Sync + 'a>(self, cb: F) -> Self::Map<'a, T, F>
+ where
+ F: FnOnce(Self::Output) -> T,
+ 'c: 'a,
+ {
+ Map::new(self, cb)
+ }
+
+ type Then<'a, T: Send + Sync + 'a, V: Send + Sync + 'a, F: Send + Sync + 'a>
+ = Then<'a, Self, F, V>
+ where
+ F: FnOnce(Self::Output) -> V,
+ V: Effective<'a, Output = T, Effect = Self::Effect>,
+ 'c: 'a;
+
+ fn then<'a, T: Send + Sync + 'a, V: Send + Sync + 'a, F: Send + Sync + 'a>(
+ self,
+ cb: F,
+ ) -> Self::Then<'a, T, V, F>
+ where
+ F: FnOnce(Self::Output) -> V,
+ V: Effective<'a, Output = T, Effect = Self::Effect>,
+ 'c: 'a,
+ {
+ Then::new(self, cb)
+ }
+
+ type AsCtx<'ctx: 'a, 'a, T: Send + Sync + 'a, F: Send + Sync + 'a>
+ = BoxOrReady<'a, (Self::Output, T)>
+ where
+ F: for<'b> FnOnce(&'b mut Self::Output) -> ErasedEffective<'b, T, Self::Effect>,
+ 'c: 'a;
+
+ fn as_ctx<'ctx: 'a, 'a, T: Send + Sync + 'a, F: Send + Sync + 'a>(self, cb: F) -> Self::AsCtx<'ctx, 'a, T, F>
+ where
+ F: for<'b> FnOnce(&'b mut Self::Output) -> ErasedEffective<'b, T, Self::Effect>,
+ 'c: 'a,
+ {
+ BoxOrReady::Boxed(Box::pin(async {
+ let mut this = self.await;
+
+ let result = cb(&mut this).into_future().await;
+
+ (this, result)
+ }))
+ }
+}