1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
pub mod id_channel;
pub use id_channel::IdChannel;

use crate::{
    kind::{Fallible, Future},
    Kind,
};

use anyhow::Error;
use core::fmt::{self, Display, Formatter};
use futures::{Sink, Stream};
use serde::{
    de::{DeserializeOwned, DeserializeSeed},
    Deserialize, Serialize,
};
use thiserror::Error;

#[derive(Serialize, Deserialize, Debug, PartialEq, Hash, Eq, Clone, Copy)]
#[repr(transparent)]
pub struct ForkHandle(pub(crate) u32);

impl Display for ForkHandle {
    fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
        write!(formatter, "{}", self.0)
    }
}

pub trait Fork: Sync + Send + 'static {
    fn fork<K: Kind>(&self, kind: K) -> Fallible<ForkHandle, K::DeconstructError>;
    fn get_fork<K: Kind>(&self, fork_ref: ForkHandle) -> Fallible<K, K::ConstructError>;
}

#[derive(Debug, Error)]
pub struct ChannelError(#[source] pub(crate) Error);

impl Display for ChannelError {
    fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
        write!(formatter, "{}", self.0)
    }
}

pub trait Channel<
    I: Serialize + DeserializeOwned + Sync + Send + 'static,
    O: Serialize + DeserializeOwned + Sync + Send + 'static,
>: Stream<Item = I> + Sink<O, Error = ChannelError> + Fork + Send + Sync + Unpin
{
}

pub trait Shim<'a, T: Target<'a, K>, K: Kind>:
    Context<'a, Item = <T as Context<'a>>::Item>
{
    fn complete<
        C: Sync
            + Send
            + Stream<Item = <T as Context<'a>>::Item>
            + Sink<<T as Context<'a>>::Item>
            + 'static,
    >(
        self,
        input: C,
    ) -> Fallible<K, K::ConstructError>;
}

pub trait Target<'a, K: Kind>: Context<'a> + Sized + Send + Sync {
    type Shim: Shim<'a, Self, K>;

    fn new_with(kind: K) -> Future<Self>
    where
        K::DeconstructFuture: Send;

    fn new_shim() -> Self::Shim;
}

pub trait Waiter {
    type Item;

    fn wait_for(&self, data: String) -> Future<()>;
    fn predicate(&self, item: &Self::Item) -> bool;
}

pub trait Context<'de> {
    type Item: Serialize + Sync + Send + 'static;
    type Target: Waiter<Item = Self::Item>
        + DeserializeSeed<'de, Value = Self::Item>
        + Clone
        + Sync
        + Send
        + 'static;

    fn context(&self) -> Self::Target;
}

pub trait OnTo: Kind {
    fn on_to<'a, T: Target<'a, Self>>(self) -> Future<T>
    where
        Self: Send + 'static,
        Self::DeconstructFuture: Sync + Send;
}

impl<K: Kind> OnTo for K {
    fn on_to<'a, T: Target<'a, Self>>(self) -> Future<T>
    where
        Self: Send + 'static,
        Self::DeconstructFuture: Sync + Send,
    {
        T::new_with(self)
    }
}