1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
use crate::{ channel::{Channel, ForkHandle}, kind, kind::{Future, Sink}, ConstructResult, DeconstructResult, Kind, }; use futures::{ future::ready, lock::Mutex, task::{Context, Poll}, Sink as ISink, SinkExt, StreamExt, }; use super::WrappedError; use alloc::sync::Arc; use core::{marker::PhantomData, pin::Pin}; use void::Void; pub struct KindSink<T: Kind, E: Kind, C: Channel<ForkHandle, ForkHandle>> { channel: Arc<Mutex<C>>, _marker: PhantomData<(T, E)>, item: Future<()>, } impl<T: Kind, E: Kind, C: Channel<ForkHandle, ForkHandle>> ISink<T> for KindSink<T, E, C> { type Error = E; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { let poll = self.item.as_mut().poll(cx).map(Ok); if let Poll::Ready(_) = poll { self.item = Box::pin(ready(())); } poll } fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { let channel = self.channel.clone(); self.item = Box::pin(async move { let mut channel = channel.lock().await; let handle = channel.fork(item).await.unwrap(); channel.send(handle).await.unwrap_or_else(|_| panic!()); }); Ok(()) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { let poll = self.item.as_mut().poll(cx).map(Ok); if let Poll::Ready(_) = poll { self.item = Box::pin(ready(())); } poll } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { let poll = self.item.as_mut().poll(cx).map(Ok); if let Poll::Ready(_) = poll { self.item = Box::pin(ready(())); } poll } } #[kind] impl<T, E> Kind for Sink<T, E> where T: Kind, E: Kind, { type ConstructItem = ForkHandle; type ConstructError = Void; type ConstructFuture = Future<ConstructResult<Self>>; type DeconstructItem = ForkHandle; type DeconstructError = WrappedError<Void>; type DeconstructFuture = Future<DeconstructResult<Self>>; fn deconstruct<C: Channel<Self::DeconstructItem, Self::ConstructItem>>( mut self, mut channel: C, ) -> Self::DeconstructFuture { Box::pin(async move { while let Some(handle) = channel.next().await { if let Err(error) = self .send(channel.get_fork::<T>(handle).await.unwrap()) .await { let handle = channel.fork::<E>(error).await.unwrap(); channel.send(handle).await.map_err(WrappedError::Send)?; } } Ok(()) }) } fn construct<C: Channel<Self::ConstructItem, Self::DeconstructItem>>( channel: C, ) -> Self::ConstructFuture { Box::pin(async move { Ok(Box::pin(KindSink { channel: Arc::new(Mutex::new(channel)), _marker: PhantomData, item: Box::pin(ready(())), }) as Sink<T, E>) }) } }