1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
use crate::{ channel::{Channel, ForkHandle}, kind, kind::{Future, Stream}, ConstructResult, DeconstructResult, Kind, }; use futures::{stream::unfold, SinkExt, StreamExt}; use super::WrappedError; #[kind] impl<T> Kind for Stream<T> where T: Kind, { type ConstructItem = Option<ForkHandle>; type ConstructError = WrappedError<T::ConstructError>; type ConstructFuture = Future<ConstructResult<Self>>; type DeconstructItem = (); type DeconstructError = WrappedError<T::DeconstructError>; type DeconstructFuture = Future<DeconstructResult<Self>>; fn deconstruct<C: Channel<Self::DeconstructItem, Self::ConstructItem>>( mut self, mut channel: C, ) -> Self::DeconstructFuture { Box::pin(async move { while let Some(item) = self.next().await { channel .send(Some(channel.fork(item).await?)) .await .map_err(WrappedError::Send)? } channel.send(None).await.map_err(WrappedError::Send)?; Ok(()) }) } fn construct<C: Channel<Self::ConstructItem, Self::DeconstructItem>>( channel: C, ) -> Self::ConstructFuture { Box::pin(async move { Ok(Box::pin(unfold(channel, |mut channel| { async move { if let Some(handle) = channel.next().await.unwrap() { Some((channel.get_fork(handle).await.unwrap(), channel)) } else { None } } })) as Stream<T>) }) } }