🏡 index : ~doyle/stork.git

author Jordan Doyle <jordan@doyle.la> 2020-02-13 5:14:15.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2020-02-13 5:28:48.0 +00:00:00
commit
764be4719b7fac766f6a436bde018bd525e6519c [patch]
tree
49ffad17b4d6b663851ecc8fb215571736dc2b66
parent
c485221c479e4c7b43f505219e444220abd262e3
download
764be4719b7fac766f6a436bde018bd525e6519c.tar.gz

SOC between the core "stork" functionality and the http backend

This makes it possible to implement a "storker" for other protocols
in a fairly straightforward way.

Diff

 .gitignore                |   2 +-
 Cargo.toml                |   2 +-
 crawler/Cargo.toml        |  23 -----------------------
 src/main.rs               |  17 ++++++++++-------
 stork/Cargo.toml          |  17 +++++++++++++++++
 stork_http/Cargo.toml     |  22 ++++++++++++++++++++++
 crawler/src/errors.rs     |  14 --------------
 crawler/src/filters.rs    |  91 --------------------------------------------------------------------------------
 crawler/src/lib.rs        | 194 --------------------------------------------------------------------------------
 stork/src/errors.rs       |   5 +++++
 stork/src/filters.rs      |  81 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 stork/src/lib.rs          | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 stork_http/src/errors.rs  |   9 +++++++++
 stork_http/src/filters.rs |  68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 stork_http/src/lib.rs     | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 15 files changed, 542 insertions(+), 331 deletions(-)

diff --git a/.gitignore b/.gitignore
index 48e3cba..1bc34f6 100644
--- a/.gitignore
+++ a/.gitignore
@@ -1,5 +1,5 @@
/target
/crawler/target
/stork*/target
**/*.rs.bk
.idea/
Cargo.lock
diff --git a/Cargo.toml b/Cargo.toml
index f25b975..83b0950 100644
--- a/Cargo.toml
+++ a/Cargo.toml
@@ -14,4 +14,4 @@
meowhash = ""
generic-array = ""

stork = { path = "crawler" }
stork_http = { path = "stork_http" }
diff --git a/crawler/Cargo.toml b/crawler/Cargo.toml
deleted file mode 100644
index da97daf..0000000 100644
--- a/crawler/Cargo.toml
+++ /dev/null
@@ -1,23 +1,0 @@
[package]
name = "stork"
version = "0.0.1"
authors = ["Jordan Doyle <jordan@doyle.la>"]
edition = "2018"

[dependencies]
select = ""
reqwest = { version = "0.10.1", features = ["gzip"] }
url = ""

failure = ""
failure_derive = ""

futures = "0.3"
async-stream = ""

digest = ""
meowhash = ""
generic-array = ""

[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }
diff --git a/src/main.rs b/src/main.rs
index 067506d..11e7502 100644
--- a/src/main.rs
+++ a/src/main.rs
@@ -1,27 +1,30 @@
use futures::pin_mut;
use futures::stream::StreamExt;
use stork::filters::{UrlFilter, UrlFilterType};

#[tokio::main]
async fn main() -> failure::Fallible<()> {
    let args: Vec<String> = std::env::args().collect();
    let url = args.get(1).expect("Expecting URL parameter").parse().unwrap();

    let stream = stork::Storkable::new(url).exec();
    let url = args
        .get(1)
        .expect("Expecting URL parameter")
        .parse()
        .unwrap();

    let stream = stork_http::HttpStorkable::new(url).exec();
    pin_mut!(stream); // needed for iteration

    while let Some(link) = stream.next().await {
        let link = link?;

        println!("{}", link.url());
        println!("{:?}", link.val());

        let stream = link.exec();
        pin_mut!(stream); // needed for iteration

        while let Some(link) = stream.next().await {
            println!("> {}", link?.url());
            println!("> {:?}", link?.val());
        }
    }

    Ok(())
}
}
diff --git a/stork/Cargo.toml b/stork/Cargo.toml
new file mode 100644
index 0000000..bdea895 100644
--- /dev/null
+++ a/stork/Cargo.toml
@@ -1,0 +1,17 @@
[package]
name = "stork"
version = "0.0.1"
authors = ["Jordan Doyle <jordan@doyle.la>"]
edition = "2018"

[dependencies]
failure = ""
failure_derive = ""

dyn-clone = ""

futures = "0.3"
async-stream = ""

[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }
diff --git a/stork_http/Cargo.toml b/stork_http/Cargo.toml
new file mode 100644
index 0000000..014cfd1 100644
--- /dev/null
+++ a/stork_http/Cargo.toml
@@ -1,0 +1,22 @@
[package]
name = "stork_http"
version = "0.0.1"
authors = ["Jordan Doyle <jordan@doyle.la>"]
edition = "2018"

[dependencies]
stork = { path = "../stork" }

select = ""
reqwest = { version = "0.10.1", features = ["gzip"] }
url = ""

failure = ""
failure_derive = ""

futures = "0.3"
async-stream = ""

[dev-dependencies]
stork = { path = "../stork" }
tokio = { version = "0.2", features = ["full"] }
diff --git a/crawler/src/errors.rs b/crawler/src/errors.rs
deleted file mode 100644
index d1cdc5d..0000000 100644
--- a/crawler/src/errors.rs
+++ /dev/null
@@ -1,14 +1,0 @@
// This is a new error type that you've created. It represents the ways a
// toolchain could be invalid.
//
// The custom derive for Fail derives an impl of both Fail and Display.
// We don't do any other magic like creating new types.
#[derive(Debug, Fail)]
pub enum StorkError {
    #[fail(display = "failed to parse url")]
    UrlParseError,
    #[fail(display = "failed to parse html")]
    HtmlParseError,
    #[fail(display = "failed to send http request")]
    HttpError,
}
diff --git a/crawler/src/filters.rs b/crawler/src/filters.rs
deleted file mode 100644
index b2dd894..0000000 100644
--- a/crawler/src/filters.rs
+++ /dev/null
@@ -1,91 +1,0 @@
use url::Url;

/// List of filters that can be used to filter down results from a

/// [Storkable](crate::Storkable). Once constructed, these can be

/// attached using [Storkable::with_filters](crate::Storkable::with_filters).

#[derive(Debug, Clone)]
pub struct FilterSet {
    url: Option<Vec<UrlFilter>>,
}
impl FilterSet {
    /// Filter results by a URL predicate.

    pub fn add_url_filter(mut self, filter: UrlFilter) -> Self {
        if self.url.is_none() {
            self.url = Some(Vec::new());
        }

        // unwrap can't panic here because we filled the value above
        self.url.as_mut().unwrap().push(filter);

        self
    }

    /// Check if this `Filters` matches the given `link`.

    pub(crate) fn matches_url(&self, link: &Url) -> bool {
        if let Some(filters) = &self.url {
            for filter in filters.iter() {
                if !filter.matches(&link) {
                    return false;
                }
            }
        }

        true
    }
}
impl Default for FilterSet {
    /// Creates an empty filter set.

    fn default() -> Self {
        FilterSet { url: None }
    }
}

#[derive(Debug, Clone)]
pub enum FilterType {
    StartsWith,
    EndsWith,
    Contains,
}

#[derive(Debug, Clone)]
pub enum UrlFilterType {
    Path(FilterType),
    Domain,
    Scheme,
}

#[derive(Debug, Clone)]
pub struct UrlFilter {
    kind: UrlFilterType,
    value: String,
    negated: bool,
}
impl UrlFilter {
    pub fn new(kind: UrlFilterType, value: String) -> Self {
        Self {
            kind,
            value,
            negated: false,
        }
    }

    pub fn negated(mut self) -> Self {
        self.negated = true;
        self
    }

    pub fn matches(&self, url: &Url) -> bool {
        let matches = match &self.kind {
            UrlFilterType::Path(FilterType::StartsWith) => url.path().starts_with(&self.value),
            UrlFilterType::Path(FilterType::EndsWith) => url.path().ends_with(&self.value),
            UrlFilterType::Path(FilterType::Contains) => url.path().contains(&self.value),
            UrlFilterType::Domain => url.host_str().map_or(false, |v| v == &self.value),
            UrlFilterType::Scheme => url.scheme() == &self.value,
        };

        match self.negated {
            true => !matches,
            false => matches,
        }
    }
}
diff --git a/crawler/src/lib.rs b/crawler/src/lib.rs
deleted file mode 100644
index 0d1fb19..0000000 100644
--- a/crawler/src/lib.rs
+++ /dev/null
@@ -1,194 +1,0 @@
//! `stork` is a simple library to recursively crawl websites for links

//! in a search engine-like fashion. stork was designed from the ground

//! to have a simple API that is easy to use.

//!

//! Your entry point into stork is the [Storkable::new] function. Have

//! a look through the [Storkable] struct's documentation for your

//! entry into the world of storking.

#![recursion_limit = "512"]

#[macro_use]
extern crate failure_derive;

pub mod errors;
pub mod filters;

pub use errors::StorkError;
pub use filters::FilterSet;

pub use url::Url;

use select::document::Document;
use select::predicate::{And, Attr, Name, Not};

use async_stream::try_stream;
use futures::pin_mut;
use futures::prelude::*;
use std::sync::Arc;

use failure::Error;
use failure::ResultExt;

/// A [Storkable] represents a "thing" (currently just a website link)

/// which is traversable.

///

/// To start "storking" a website an initial [Storkable] can be

/// constructed with [Storkable::new], once initialised filters can be

/// added using [Storkable::with_filters].

///

/// After a [Storkable] has been initialised, the storking can begin

/// with a call to [Storkable::exec] which will return a

/// stream of more [Storkable]s (with the filters from the parent

/// [Storkable] copied) which in turn can also be storked if necessary.

///

/// Example usage:

///

/// ```

/// # use failure::err_msg;

/// # use stork::{Storkable, FilterSet, filters::{UrlFilter, UrlFilterType}};

/// # use futures::StreamExt;

/// #

/// # #[tokio::main]

/// # async fn main() -> failure::Fallible<()> {

/// let stream = Storkable::new("https://example.com/".parse()?)

///     .with_filters(

///         FilterSet::default()

///             .add_url_filter(UrlFilter::new(UrlFilterType::Domain, String::from("www.iana.org")))

///             .add_url_filter(UrlFilter::new(UrlFilterType::Scheme, String::from("https")))

///     )

///     .exec();

/// # futures::pin_mut!(stream); // needed for iteration

/// let first_link: Storkable = stream.next().await.ok_or(err_msg("no links on page"))??;

/// assert_eq!(first_link.url().as_str(), "https://www.iana.org/domains/example");

/// assert_eq!(first_link.parent().unwrap().url().as_str(), "https://example.com/");

///

/// let stream = first_link.exec();

/// # futures::pin_mut!(stream); // needed for iteration

/// let inner_link = stream.next().await.ok_or(err_msg("no links on page"))??;

/// assert_eq!(inner_link.url().as_str(), "https://www.iana.org/");

/// assert_eq!(inner_link.parent().unwrap().url().as_str(), "https://www.iana.org/domains/example");

/// # Ok(())

/// # }

/// ```

#[derive(Debug, Clone)]
pub struct Storkable {
    url: Url,
    filters: Arc<FilterSet>,
    client: Arc<reqwest::Client>,
    parent: Option<Arc<Storkable>>,
}
impl Storkable {
    /// Instantiates a new [Storkable] from a [Url], storking can then

    /// begin on the given [Url] using the [Storkable::exec] method.

    pub fn new(url: Url) -> Self {
        Self {
            url,
            filters: Arc::new(FilterSet::default()),
            client: Arc::new(
                reqwest::Client::builder()
                    .user_agent(concat!(
                        env!("CARGO_PKG_NAME"),
                        "/",
                        env!("CARGO_PKG_VERSION")
                    ))
                    .build()
                    .unwrap(),
            ),
            parent: None,
        }
    }

    /// Attaches a [FilterSet] to this, and child, [Storkable]s.

    pub fn with_filters(mut self, filters: FilterSet) -> Self {
        self.filters = Arc::new(filters);
        self
    }

    /// Set a custom [reqwest::Client] to use with this, and child,

    /// [Storkable]s.

    pub fn with_client(mut self, client: reqwest::Client) -> Self {
        self.client = Arc::new(client);
        self
    }

    /// Get the URL of this [Storkable].

    pub fn url(&self) -> &Url {
        &self.url
    }

    /// Get the [Storkable] from which this [Storkable] was found on.

    pub fn parent(&self) -> Option<&Storkable> {
        // map to Arc::as_ref to hide the underlying Arc implementation
        self.parent.as_ref().map(Arc::as_ref)
    }

    /// Start storking this [Storkable].

    ///

    /// Finds all the followable links on this [Storkable] and returns

    /// a stream of more [Storkable]s with the same filters and the

    /// `parent` set to a reference of the current [Storkable].

    pub fn exec<'a>(self) -> impl futures::Stream<Item = Result<Storkable, Error>> + 'a {
        let this = Arc::new(self);

        try_stream! {
            let links = get_all_links_from_page(&this);
            pin_mut!(links); // needed for iteration

            while let Some(link) = links.next().await {
                let link = link?;

                if !this.filters.matches_url(&link.url) {
                    continue;
                }

                yield Storkable {
                    url: link.url,
                    client: Arc::clone(&this.client),
                    filters: Arc::clone(&this.filters),
                    parent: Some(Arc::clone(&this)),
                };
            }
        }
    }
}

struct PageLink {
    pub name: String,
    pub url: Url,
}

/// Sends a request to the [Storkable::url] and grabs all followable

/// links from it.

fn get_all_links_from_page<'a>(
    storkable: &'a Storkable,
) -> impl futures::Stream<Item = Result<PageLink, Error>> + 'a {
    try_stream! {
        let root = storkable.url.clone();

        // TODO: can we get this to stream into the Document? need some
        // TODO: compat layer between futures and std::io::Read
        let doc = storkable.client.get(root.clone())
            .send().await.context(StorkError::HttpError)?
            .bytes().await.context(StorkError::HttpError)?;
        let document = Document::from_read(&doc[..]).context(StorkError::HtmlParseError)?;

        for node in document.find(And(Name("a"), Not(Attr("rel", "nofollow")))) {
            let title = node.text().trim().to_string();
            let href = node.attr("href");

            if let Some(href) = href {
                // if this looks like a relative url append it to the root
                let href = if href.starts_with('/') || !href.contains("://") {
                    root.join(href).context(StorkError::UrlParseError)?
                } else {
                    Url::parse(href).context(StorkError::UrlParseError)?
                };

                yield PageLink {
                    name: title,
                    url: href,
                };
            }
        }
    }
}
diff --git a/stork/src/errors.rs b/stork/src/errors.rs
new file mode 100644
index 0000000..8650a9a 100644
--- /dev/null
+++ a/stork/src/errors.rs
@@ -1,0 +1,5 @@
#[derive(Debug, Fail)]
pub enum StorkError {
    #[fail(display = "error whilst fetching link from StorkClient")]
    ClientError,
}
diff --git a/stork/src/filters.rs b/stork/src/filters.rs
new file mode 100644
index 0000000..babcabf 100644
--- /dev/null
+++ a/stork/src/filters.rs
@@ -1,0 +1,81 @@
/// List of filters that can be used to filter down results from a

/// [Storkable](crate::Storkable). Once constructed, these can be

/// attached using [Storkable::with_filters](crate::Storkable::with_filters).

#[derive(Debug)]
pub struct FilterSet<T> {
    filters: Option<Vec<Box<dyn Filter<T>>>>,
}
impl<T> FilterSet<T> {
    /// Filter results by a given predicate.

    pub fn add_filter<F: Filter<T> + 'static>(mut self, filter: F) -> Self {
        if self.filters.is_none() {
            self.filters = Some(Vec::new());
        }

        // unwrap can't panic here because we filled the value above
        self.filters.as_mut().unwrap().push(Box::new(filter));

        self
    }

    /// Check if this `Filters` matches the given `link`.

    pub(crate) fn matches(&self, val: &T) -> bool {
        if let Some(filters) = &self.filters {
            for filter in filters.iter() {
                if !filter.matches(&val) {
                    return false;
                }
            }
        }

        true
    }
}
impl<T> Default for FilterSet<T> {
    /// Creates an empty filter set.

    fn default() -> Self {
        FilterSet { filters: None }
    }
}
/// We need to manually implement [Clone] for this struct because

/// otherwise it won't be derived on values where T doesn't implement

/// Clone (which would be an unnecessary restriction on our API as T

/// is a type param on a method).

impl<T> Clone for FilterSet<T> {
    fn clone(&self) -> Self {
        Self {
            filters: self.filters.clone(),
        }
    }
}

/// Predicate for any values of <T> passing through a

/// [Storkable](crate::Storkable). See [html_filters] for example

/// implementations.

///

/// Note: *all* implementations of `Filter` should have an impl of

/// [Clone] so they can be passed to children and modified without

/// modifying FilterSets on parents.

///

/// [html_filters]: (../stork_html/filters)

pub trait Filter<T>: std::fmt::Debug + dyn_clone::DynClone {
    fn matches(&self, val: &T) -> bool;
}

/// we need to use dyn_clone's impl of cloning a boxed dynamically

/// dispatched trait because implementing it involves a bit of unsafe

/// code with recent changes to the compiler, so we'll trust them to

/// handle it.

impl<T> std::clone::Clone for Box<dyn Filter<T>> {
    fn clone(&self) -> Self {
        dyn_clone::clone_box(self.as_ref())
    }
}

#[derive(Debug, Clone)]
pub enum FilterType {
    StartsWith,
    EndsWith,
    Contains,
    Equals
}
diff --git a/stork/src/lib.rs b/stork/src/lib.rs
new file mode 100644
index 0000000..5cd5b76 100644
--- /dev/null
+++ a/stork/src/lib.rs
@@ -1,0 +1,142 @@
//! `stork` is a simple futures-based library to recursively crawl

//! sources in a search engine-like fashion. stork was designed from the

//! ground to have a simple API that is easy to use and can be reused

//! across multiple protocols, yielding each result giving end users the

//! freedom to do BFS, DFS or any type of search they may so wish.

//!

//! Your entry point into stork is the [Storkable::new] function. Have

//! a look through the [Storkable] struct's documentation for your

//! entry into the world of storking.

//!

//! *Note: you're probably not looking for this library on its own but

//! a protocol implementation of it. See below for some first-party

//! implementations:*

//! - [stork_http](../stork_http/index.html)

#![recursion_limit = "256"]

#[macro_use]
extern crate failure_derive;

pub mod errors;
pub mod filters;

pub use errors::StorkError;
pub use filters::FilterSet;

use async_stream::try_stream;
use futures::prelude::*;

use std::pin::Pin;
use std::sync::Arc;

use failure::Error;
use failure::ResultExt;

/// A [Storkable] represents a "thing" which is traversable ("storkable").

///

/// To start "storking" an initial [Storkable] can be constructed with

/// with [Storkable::new], once initialised filters can be added using

/// [Storkable::with_filters].

///

/// After a [Storkable] has been initialised, the storking can begin

/// with a call to [Storkable::exec] which will return a

/// stream of more [Storkable]s (with the filters from the parent

/// [Storkable] copied) which in turn can also be storked if necessary.

///

/// A Storkable derives its functionality from its two generics,

/// `T` and `C: StorkClient<T>`. The `StorkClient` implementation will

/// be called with a value of `T`, and is expected to return all the

/// values of `T` that can be found on the given `T`.

#[derive(Debug, Clone)]
pub struct Storkable<T: Unpin, C: StorkClient<T>> {
    value: T,
    filters: FilterSet<T>,
    client: Arc<C>,
    parent: Option<Arc<Storkable<T, C>>>,
}

impl<'a, T: Unpin + 'a, C: StorkClient<T> + 'a> Storkable<T, C> {
    /// Instantiates a new [Storkable] from a T, storking can then

    /// begin on the given entrypoint using the [Storkable::exec] method.

    pub fn new(val: T) -> Self {
        Self {
            value: val,
            filters: FilterSet::default(),
            client: Arc::new(C::default()),
            parent: None,
        }
    }

    /// Attaches a [FilterSet] to this [Storkable] and any children

    /// found after executing this one.

    pub fn with_filters(mut self, filters: FilterSet<T>) -> Self {
        self.filters = filters;
        self
    }

    /// Replaces the default [StorkClient] with a new one accepting

    /// and returning the same type for this [Storkable].

    pub fn with_client(mut self, client: C) -> Self {
        self.client = Arc::new(client);
        self
    }

    // Grab a reference to the filters set on this [Storkable].
    pub fn filters(&self) -> &FilterSet<T> {
        &self.filters
    }

    /// Get the value of this [Storkable].

    pub fn val(&self) -> &T {
        &self.value
    }

    /// Get the [Storkable] from which this [Storkable] was found on.

    pub fn parent(&self) -> Option<&Storkable<T, C>> {
        // map to Arc::as_ref to hide the underlying Arc implementation
        self.parent.as_ref().map(Arc::as_ref)
    }

    /// Start storking this [Storkable].

    ///

    /// Finds all the followable links on this [Storkable] and returns

    /// a stream of more [Storkable]s with the same filters and the

    /// `parent` set to a reference of the current [Storkable].

    pub fn exec<'b>(self) -> impl futures::Stream<Item = Result<Storkable<T, C>, Error>> + 'a {
        let this = Arc::new(self);

        try_stream! {
            let mut children = this.client.run(this.val());

            while let Some(child) = children.next().await {
                let child = child.context(StorkError::ClientError)?;

                if !this.filters.matches(&child) {
                    continue;
                }

                yield Storkable {
                    value: child,
                    client: Arc::clone(&this.client),
                    filters: this.filters.clone(),
                    parent: Some(Arc::clone(&this)),
                };
            }
        }
    }
}

/// A [StorkClient] is an underlying implementation of a storker. When a

/// [Storkable] is initialised a [StorkClient] will be created using

/// [Default::default] and the instance will be shared between all child

/// [Storkable]s.

///

/// The default [StorkClient] initialised by the [Storkable] can be

/// replaced using [Storkable::with_client].

///

/// [StorkClient]s may be used across threads and *must* be thread-safe.

pub trait StorkClient<T>: Default {
    /// Makes a call to `T` and returns the child `T`s it can find on the

    /// page.

    fn run(&self, src: &T) -> Pin<Box<dyn futures::Stream<Item = Result<T, Error>>>>;
}
diff --git a/stork_http/src/errors.rs b/stork_http/src/errors.rs
new file mode 100644
index 0000000..3f80bb9 100644
--- /dev/null
+++ a/stork_http/src/errors.rs
@@ -1,0 +1,9 @@
#[derive(Debug, Fail)]
pub enum StorkHttpError {
    #[fail(display = "failed to parse url")]
    UrlParseError,
    #[fail(display = "failed to parse html")]
    HtmlParseError,
    #[fail(display = "failed to send http request")]
    HttpError,
}
diff --git a/stork_http/src/filters.rs b/stork_http/src/filters.rs
new file mode 100644
index 0000000..f4210e0 100644
--- /dev/null
+++ a/stork_http/src/filters.rs
@@ -1,0 +1,68 @@
pub use stork::filters::FilterType;

use std::borrow::Cow;

use stork::filters::Filter;

use crate::Link;

#[derive(Debug, Clone)]
pub enum UrlFilterType {
    Path(FilterType),
    Domain,
    Scheme,
}

#[derive(Debug, Clone)]
pub struct DomainFilter<'a>(Cow<'a, str>);
impl<'a> DomainFilter<'a> {
    pub fn new<V: Into<Cow<'a, str>>>(value: V) -> Self {
        Self(value.into())
    }
}
impl<'a> Filter<Link> for DomainFilter<'a> {
    fn matches(&self, link: &Link) -> bool {
        link.url()
            .host_str()
            .map_or(false, |v| v == self.0.as_ref())
    }
}

#[derive(Debug, Clone)]
pub struct SchemeFilter<'a>(Cow<'a, str>);
impl<'a> SchemeFilter<'a> {
    pub fn new<V: Into<Cow<'a, str>>>(value: V) -> Self {
        Self(value.into())
    }
}
impl<'a> Filter<Link> for SchemeFilter<'a> {
    fn matches(&self, link: &Link) -> bool {
        link.url().scheme() == self.0.as_ref()
    }
}

#[derive(Debug, Clone)]
pub struct PathFilter<'a> {
    value: Cow<'a, str>,
    kind: FilterType,
}
impl<'a> PathFilter<'a> {
    pub fn new<V: Into<Cow<'a, str>>>(kind: FilterType, value: V) -> Self {
        Self {
            kind,
            value: value.into(),
        }
    }
}
impl<'a> Filter<Link> for PathFilter<'a> {
    fn matches(&self, link: &Link) -> bool {
        let url = link.url();

        match &self.kind {
            FilterType::StartsWith => url.path().starts_with(self.value.as_ref()),
            FilterType::EndsWith => url.path().ends_with(self.value.as_ref()),
            FilterType::Contains => url.path().contains(self.value.as_ref()),
            FilterType::Equals => url.path() == self.value.as_ref(),
        }
    }
}
diff --git a/stork_http/src/lib.rs b/stork_http/src/lib.rs
new file mode 100644
index 0000000..399e7f1 100644
--- /dev/null
+++ a/stork_http/src/lib.rs
@@ -1,0 +1,186 @@
//! # stork_http

//! This is a [stork](../stork/index.html) implementation for the HTTP

//! protocol and specifically HTML-based web scraping. Given an initial

//! page to scrape, stork_http will find all indexable links on the page

//! and yield them back to you - ready to scrape again in an instant

//! or store for later to come back to at another time, all using futures

//! to allow for parallel processing.

//!

//! At this time `rel="nofollow"` is strictly enforced and not possible

//! to change although this will come in time as more filters are added.

//!

//! Example usage:

//!

//! ```

//! # use stork::FilterSet;

//! # use failure::err_msg;

//! # use stork_http::{HttpStorkable, filters::*};

//! # use futures::StreamExt;

//! #

//! # #[tokio::main]

//! # async fn main() -> failure::Fallible<()> {

//! // start scanning https://example.com/ for links with the given filters

//! let stream = HttpStorkable::new("https://example.com/".parse()?)

//!     .with_filters(

//!         FilterSet::default()

//!             .add_filter(DomainFilter::new("www.iana.org"))

//!             .add_filter(SchemeFilter::new("https"))

//!     )

//!     .exec();

//! # futures::pin_mut!(stream); // needed for iteration

//! // get the first link from example.com and ensure its the one we expected

//! // it to be

//! let first_link_on_example: HttpStorkable = match stream.next().await {

//!     Some(Ok(link)) => {

//!         assert_eq!(link.val().text(), Some("More information...".to_string()));

//!         assert_eq!(link.val().url().as_str(), "https://www.iana.org/domains/example");

//!         assert_eq!(link.parent().unwrap().val().url().as_str(), "https://example.com/");

//!         link

//!     },

//!     _ => panic!("failed to get links from page")

//! };

//!

//! // add another filter looking for the root path and start scanning for links

//! let filters = first_link_on_example.filters().clone()

//!     .add_filter(PathFilter::new(FilterType::Equals, "/"));

//! let stream = first_link_on_example

//!     .with_filters(filters)

//!     .exec();

//! # futures::pin_mut!(stream); // needed for iteration

//! // get the first link from the stream and ensure its a link to the homepage

//! match stream.next().await {

//!     Some(Ok(link)) => {

//!         assert_eq!(link.val().url().as_str(), "https://www.iana.org/");

//!         assert_eq!(link.parent().unwrap().val().url().as_str(), "https://www.iana.org/domains/example");

//!         assert_eq!(link.parent().unwrap().parent().unwrap().val().url().as_str(), "https://example.com/")

//!     },

//!     _ => panic!("failed to get links from page")

//! }

//! // ensure theres no other unexpected links on the homepage

//! assert!(stream.next().await.is_none(), "should've been only one homepage link on the page!");

//! # Ok(())

//! # }

//! ```


#![recursion_limit = "256"]

#[macro_use]
extern crate failure_derive;

mod errors;
pub mod filters;

pub use errors::StorkHttpError;
pub use url::Url;

use stork::{StorkClient, Storkable};

use std::pin::Pin;

use select::document::Document;
use select::predicate::{And, Attr, Name, Not};

use async_stream::try_stream;

use failure::Error;
use failure::ResultExt;

use std::sync::Arc;

pub use reqwest::Client as ReqwestClient;

pub type HttpStorkable = Storkable<Link, HttpStorkClient>;

#[derive(Debug)]
pub struct Link {
    url: Url,
    text: Option<String>,
}
impl Link {
    pub fn url(&self) -> &Url {
        &self.url
    }

    pub fn text(&self) -> Option<String> {
        self.text.clone()
    }
}
impl std::str::FromStr for Link {
    type Err = failure::Error;

    fn from_str(input: &str) -> Result<Link, Error> {
        Ok(Self {
            url: Url::parse(input).context(StorkHttpError::UrlParseError)?,
            text: None,
        })
    }
}
impl From<Url> for Link {
    fn from(url: Url) -> Self {
        Self { url, text: None }
    }
}

pub struct HttpStorkClient {
    client: Arc<reqwest::Client>,
}

impl HttpStorkClient {
    pub fn new(client: ReqwestClient) -> Self {
        Self {
            client: Arc::new(client),
        }
    }
}

impl Default for HttpStorkClient {
    fn default() -> Self {
        Self {
            client: Arc::new(
                reqwest::Client::builder()
                    .user_agent(concat!(
                        env!("CARGO_PKG_NAME"),
                        "/",
                        env!("CARGO_PKG_VERSION")
                    ))
                    .build()
                    .unwrap(),
            ),
        }
    }
}

impl StorkClient<Link> for HttpStorkClient {
    fn run(&self, src: &Link) -> Pin<Box<dyn futures::Stream<Item = Result<Link, Error>>>> {
        let root = src.url.clone();
        let client = Arc::clone(&self.client);

        Box::pin(try_stream! {
            // TODO: can we get this to stream into the Document? need some
            // TODO: compat layer between futures and std::io::Read
            let doc = client.get(root.clone())
                .send().await.context(StorkHttpError::HttpError)?
                .bytes().await.context(StorkHttpError::HttpError)?;
            let document = Document::from_read(&doc[..]).context(StorkHttpError::HtmlParseError)?;

            for node in document.find(And(Name("a"), Not(Attr("rel", "nofollow")))) {
                let title = node.text().trim().to_string();
                let href = node.attr("href");

                if let Some(href) = href {
                    // if this looks like a relative url append it to the root
                    let href = if href.starts_with('/') || !href.contains("://") {
                        root.join(href).context(StorkHttpError::UrlParseError)?
                    } else {
                        Url::parse(href).context(StorkHttpError::UrlParseError)?
                    };

                    yield Link {
                        url: href,
                        text: Some(title).filter(|x| !x.is_empty())
                    };
                }
            }
        })
    }
}