1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
mod array;
mod collections;
mod default;
mod error;
mod functions;
mod future;
mod iterator;
mod option;
mod phantom_data;
mod primitives;
mod result;
mod serde;
mod sink;
mod sink_stream;
mod stream;
mod tuple;
mod unit;
mod url;
pub mod using;
mod wrapped;
pub use self::serde::Serde;
pub use default::Default;
pub use iterator::Iterator;
pub use sink_stream::SinkStream;

use anyhow::Error;
use core::pin::Pin;
use futures::{
    stream::once, Future as IFuture, FutureExt, Sink as ISink, Stream as IStream, StreamExt,
};
use std::error::Error as StdError;
use thiserror::Error;

use crate::{channel::ChannelError, Kind};

#[derive(Error, Kind, Debug)]
#[error("transport error: {cause}")]
pub struct TransportError {
    #[source]
    cause: Error,
}

impl TransportError {
    fn new(cause: Error) -> Self {
        TransportError { cause }
    }
}

pub type Future<T> = Pin<Box<dyn IFuture<Output = T> + Sync + Send>>;
pub type Fallible<T, E> = Future<Result<T, E>>;
pub type Stream<T> = Pin<Box<dyn IStream<Item = T> + Sync + Send>>;
pub type Infallible<T> = Fallible<T, TransportError>;
pub type Sink<T, E> = Pin<Box<dyn ISink<T, Error = E> + Sync + Send>>;

/// The result of reconstructing a Kind.
pub type ConstructResult<K> = Result<K, <K as Kind>::ConstructError>;
/// The result of deconstructing a Kind.
pub type DeconstructResult<K> = Result<(), <K as Kind>::DeconstructError>;

pub trait Flatten: Sized {
    fn flatten<
        E: 'static + Sync + Send + Into<Error>,
        F: IFuture<Output = Result<Self, E>> + Sync + Send + 'static,
    >(
        fut: F,
    ) -> Self;
}

impl<U: From<TransportError> + Sync + Send, T> Flatten for Fallible<T, U> {
    fn flatten<
        E: 'static + Sync + Send + Into<Error>,
        F: IFuture<Output = Result<Self, E>> + Sync + Send + 'static,
    >(
        fut: F,
    ) -> Self {
        Box::pin(async move {
            fut.await
                .map_err(|e| U::from(TransportError::new(e.into())))?
                .await
        })
    }
}

impl<U: From<TransportError>, T> Flatten for Stream<Result<T, U>> {
    fn flatten<
        E: 'static + Sync + Send + Into<Error>,
        F: IFuture<Output = Result<Self, E>> + Sync + Send + 'static,
    >(
        fut: F,
    ) -> Self {
        Box::pin(
            async move {
                let r = fut.await;
                match r {
                    Err(e) => Box::pin(once(
                        async move { Err(U::from(TransportError::new(e.into()))) },
                    )) as Stream<Result<T, U>>,
                    Ok(s) => Box::pin(s),
                }
            }
            .into_stream()
            .flatten(),
        )
    }
}

pub trait AsKindMarker {}

#[derive(Error, Debug)]
pub enum WrappedError<T: StdError + 'static> {
    #[error("{0}")]
    Concrete(#[from] T),
    #[error("got {got} items in construct, expected {expected}")]
    Insufficient { got: usize, expected: usize },
    #[error("failed to send on underlying channel: {0}")]
    Send(ChannelError),
}

pub trait AsKind<M: AsKindMarker>: Sized {
    type Kind: Kind;

    fn into_kind(self) -> Self::Kind;
    fn from_kind(kind: Self::Kind) -> Self;
}