#![recursion_limit = "256"]
#[macro_use]
extern crate failure_derive;
pub mod errors;
pub mod filters;
pub use errors::StorkError;
pub use filters::FilterSet;
use async_stream::try_stream;
use futures::prelude::*;
use std::pin::Pin;
use std::sync::Arc;
use failure::Error;
use failure::ResultExt;
#[derive(Debug, Clone)]
pub struct Storkable<T: Unpin, C: StorkClient<T>> {
value: T,
filters: FilterSet<T>,
client: Arc<C>,
parent: Option<Arc<Storkable<T, C>>>,
}
impl<'a, T: Unpin + 'a, C: StorkClient<T> + 'a> Storkable<T, C> {
pub fn new(val: T) -> Self {
Self {
value: val,
filters: FilterSet::default(),
client: Arc::new(C::default()),
parent: None,
}
}
pub fn with_filters(mut self, filters: FilterSet<T>) -> Self {
self.filters = filters;
self
}
pub fn with_client(mut self, client: C) -> Self {
self.client = Arc::new(client);
self
}
pub fn filters(&self) -> &FilterSet<T> {
&self.filters
}
pub fn val(&self) -> &T {
&self.value
}
pub fn parent(&self) -> Option<&Storkable<T, C>> {
self.parent.as_ref().map(Arc::as_ref)
}
pub fn exec<'b>(self) -> impl futures::Stream<Item = Result<Storkable<T, C>, Error>> + 'a {
let this = Arc::new(self);
try_stream! {
let mut children = this.client.run(this.val());
while let Some(child) = children.next().await {
let child = child.context(StorkError::ClientError)?;
if !this.filters.matches(&child) {
continue;
}
yield Storkable {
value: child,
client: Arc::clone(&this.client),
filters: this.filters.clone(),
parent: Some(Arc::clone(&this)),
};
}
}
}
}
pub trait StorkClient<T>: Default {
fn run(&self, src: &T) -> Pin<Box<dyn futures::Stream<Item = Result<T, Error>>>>;
}