SOC between the core "stork" functionality and the http backend
This makes it possible to implement a "storker" for other protocols
in a fairly straightforward way.
Diff
.gitignore | 2 +-
Cargo.toml | 2 +-
crawler/Cargo.toml | 23 -----------------------
src/main.rs | 17 ++++++++++-------
stork/Cargo.toml | 17 +++++++++++++++++
stork_http/Cargo.toml | 22 ++++++++++++++++++++++
crawler/src/errors.rs | 14 --------------
crawler/src/filters.rs | 91 --------------------------------------------------------------------------------
crawler/src/lib.rs | 194 --------------------------------------------------------------------------------
stork/src/errors.rs | 5 +++++
stork/src/filters.rs | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
stork/src/lib.rs | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
stork_http/src/errors.rs | 9 +++++++++
stork_http/src/filters.rs | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
stork_http/src/lib.rs | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
15 files changed, 542 insertions(+), 331 deletions(-)
@@ -1,5 +1,5 @@
/target
/crawler/target
/stork*/target
**/*.rs.bk
.idea/
Cargo.lock
@@ -14,4 +14,4 @@
meowhash = ""
generic-array = ""
stork = { path = "crawler" }
stork_http = { path = "stork_http" }
@@ -1,23 +1,0 @@
[package]
name = "stork"
version = "0.0.1"
authors = ["Jordan Doyle <jordan@doyle.la>"]
edition = "2018"
[dependencies]
select = ""
reqwest = { version = "0.10.1", features = ["gzip"] }
url = ""
failure = ""
failure_derive = ""
futures = "0.3"
async-stream = ""
digest = ""
meowhash = ""
generic-array = ""
[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }
@@ -1,27 +1,30 @@
use futures::pin_mut;
use futures::stream::StreamExt;
use stork::filters::{UrlFilter, UrlFilterType};
#[tokio::main]
async fn main() -> failure::Fallible<()> {
let args: Vec<String> = std::env::args().collect();
let url = args.get(1).expect("Expecting URL parameter").parse().unwrap();
let stream = stork::Storkable::new(url).exec();
let url = args
.get(1)
.expect("Expecting URL parameter")
.parse()
.unwrap();
let stream = stork_http::HttpStorkable::new(url).exec();
pin_mut!(stream);
while let Some(link) = stream.next().await {
let link = link?;
println!("{}", link.url());
println!("{:?}", link.val());
let stream = link.exec();
pin_mut!(stream);
while let Some(link) = stream.next().await {
println!("> {}", link?.url());
println!("> {:?}", link?.val());
}
}
Ok(())
}
}
@@ -1,0 +1,17 @@
[package]
name = "stork"
version = "0.0.1"
authors = ["Jordan Doyle <jordan@doyle.la>"]
edition = "2018"
[dependencies]
failure = ""
failure_derive = ""
dyn-clone = ""
futures = "0.3"
async-stream = ""
[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }
@@ -1,0 +1,22 @@
[package]
name = "stork_http"
version = "0.0.1"
authors = ["Jordan Doyle <jordan@doyle.la>"]
edition = "2018"
[dependencies]
stork = { path = "../stork" }
select = ""
reqwest = { version = "0.10.1", features = ["gzip"] }
url = ""
failure = ""
failure_derive = ""
futures = "0.3"
async-stream = ""
[dev-dependencies]
stork = { path = "../stork" }
tokio = { version = "0.2", features = ["full"] }
@@ -1,14 +1,0 @@
#[derive(Debug, Fail)]
pub enum StorkError {
#[fail(display = "failed to parse url")]
UrlParseError,
#[fail(display = "failed to parse html")]
HtmlParseError,
#[fail(display = "failed to send http request")]
HttpError,
}
@@ -1,91 +1,0 @@
use url::Url;
#[derive(Debug, Clone)]
pub struct FilterSet {
url: Option<Vec<UrlFilter>>,
}
impl FilterSet {
pub fn add_url_filter(mut self, filter: UrlFilter) -> Self {
if self.url.is_none() {
self.url = Some(Vec::new());
}
self.url.as_mut().unwrap().push(filter);
self
}
pub(crate) fn matches_url(&self, link: &Url) -> bool {
if let Some(filters) = &self.url {
for filter in filters.iter() {
if !filter.matches(&link) {
return false;
}
}
}
true
}
}
impl Default for FilterSet {
fn default() -> Self {
FilterSet { url: None }
}
}
#[derive(Debug, Clone)]
pub enum FilterType {
StartsWith,
EndsWith,
Contains,
}
#[derive(Debug, Clone)]
pub enum UrlFilterType {
Path(FilterType),
Domain,
Scheme,
}
#[derive(Debug, Clone)]
pub struct UrlFilter {
kind: UrlFilterType,
value: String,
negated: bool,
}
impl UrlFilter {
pub fn new(kind: UrlFilterType, value: String) -> Self {
Self {
kind,
value,
negated: false,
}
}
pub fn negated(mut self) -> Self {
self.negated = true;
self
}
pub fn matches(&self, url: &Url) -> bool {
let matches = match &self.kind {
UrlFilterType::Path(FilterType::StartsWith) => url.path().starts_with(&self.value),
UrlFilterType::Path(FilterType::EndsWith) => url.path().ends_with(&self.value),
UrlFilterType::Path(FilterType::Contains) => url.path().contains(&self.value),
UrlFilterType::Domain => url.host_str().map_or(false, |v| v == &self.value),
UrlFilterType::Scheme => url.scheme() == &self.value,
};
match self.negated {
true => !matches,
false => matches,
}
}
}
@@ -1,194 +1,0 @@
#![recursion_limit = "512"]
#[macro_use]
extern crate failure_derive;
pub mod errors;
pub mod filters;
pub use errors::StorkError;
pub use filters::FilterSet;
pub use url::Url;
use select::document::Document;
use select::predicate::{And, Attr, Name, Not};
use async_stream::try_stream;
use futures::pin_mut;
use futures::prelude::*;
use std::sync::Arc;
use failure::Error;
use failure::ResultExt;
#[derive(Debug, Clone)]
pub struct Storkable {
url: Url,
filters: Arc<FilterSet>,
client: Arc<reqwest::Client>,
parent: Option<Arc<Storkable>>,
}
impl Storkable {
pub fn new(url: Url) -> Self {
Self {
url,
filters: Arc::new(FilterSet::default()),
client: Arc::new(
reqwest::Client::builder()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
.build()
.unwrap(),
),
parent: None,
}
}
pub fn with_filters(mut self, filters: FilterSet) -> Self {
self.filters = Arc::new(filters);
self
}
pub fn with_client(mut self, client: reqwest::Client) -> Self {
self.client = Arc::new(client);
self
}
pub fn url(&self) -> &Url {
&self.url
}
pub fn parent(&self) -> Option<&Storkable> {
self.parent.as_ref().map(Arc::as_ref)
}
pub fn exec<'a>(self) -> impl futures::Stream<Item = Result<Storkable, Error>> + 'a {
let this = Arc::new(self);
try_stream! {
let links = get_all_links_from_page(&this);
pin_mut!(links);
while let Some(link) = links.next().await {
let link = link?;
if !this.filters.matches_url(&link.url) {
continue;
}
yield Storkable {
url: link.url,
client: Arc::clone(&this.client),
filters: Arc::clone(&this.filters),
parent: Some(Arc::clone(&this)),
};
}
}
}
}
struct PageLink {
pub name: String,
pub url: Url,
}
fn get_all_links_from_page<'a>(
storkable: &'a Storkable,
) -> impl futures::Stream<Item = Result<PageLink, Error>> + 'a {
try_stream! {
let root = storkable.url.clone();
let doc = storkable.client.get(root.clone())
.send().await.context(StorkError::HttpError)?
.bytes().await.context(StorkError::HttpError)?;
let document = Document::from_read(&doc[..]).context(StorkError::HtmlParseError)?;
for node in document.find(And(Name("a"), Not(Attr("rel", "nofollow")))) {
let title = node.text().trim().to_string();
let href = node.attr("href");
if let Some(href) = href {
let href = if href.starts_with('/') || !href.contains("://") {
root.join(href).context(StorkError::UrlParseError)?
} else {
Url::parse(href).context(StorkError::UrlParseError)?
};
yield PageLink {
name: title,
url: href,
};
}
}
}
}
@@ -1,0 +1,5 @@
#[derive(Debug, Fail)]
pub enum StorkError {
#[fail(display = "error whilst fetching link from StorkClient")]
ClientError,
}
@@ -1,0 +1,81 @@
#[derive(Debug)]
pub struct FilterSet<T> {
filters: Option<Vec<Box<dyn Filter<T>>>>,
}
impl<T> FilterSet<T> {
pub fn add_filter<F: Filter<T> + 'static>(mut self, filter: F) -> Self {
if self.filters.is_none() {
self.filters = Some(Vec::new());
}
self.filters.as_mut().unwrap().push(Box::new(filter));
self
}
pub(crate) fn matches(&self, val: &T) -> bool {
if let Some(filters) = &self.filters {
for filter in filters.iter() {
if !filter.matches(&val) {
return false;
}
}
}
true
}
}
impl<T> Default for FilterSet<T> {
fn default() -> Self {
FilterSet { filters: None }
}
}
impl<T> Clone for FilterSet<T> {
fn clone(&self) -> Self {
Self {
filters: self.filters.clone(),
}
}
}
pub trait Filter<T>: std::fmt::Debug + dyn_clone::DynClone {
fn matches(&self, val: &T) -> bool;
}
impl<T> std::clone::Clone for Box<dyn Filter<T>> {
fn clone(&self) -> Self {
dyn_clone::clone_box(self.as_ref())
}
}
#[derive(Debug, Clone)]
pub enum FilterType {
StartsWith,
EndsWith,
Contains,
Equals
}
@@ -1,0 +1,142 @@
#![recursion_limit = "256"]
#[macro_use]
extern crate failure_derive;
pub mod errors;
pub mod filters;
pub use errors::StorkError;
pub use filters::FilterSet;
use async_stream::try_stream;
use futures::prelude::*;
use std::pin::Pin;
use std::sync::Arc;
use failure::Error;
use failure::ResultExt;
#[derive(Debug, Clone)]
pub struct Storkable<T: Unpin, C: StorkClient<T>> {
value: T,
filters: FilterSet<T>,
client: Arc<C>,
parent: Option<Arc<Storkable<T, C>>>,
}
impl<'a, T: Unpin + 'a, C: StorkClient<T> + 'a> Storkable<T, C> {
pub fn new(val: T) -> Self {
Self {
value: val,
filters: FilterSet::default(),
client: Arc::new(C::default()),
parent: None,
}
}
pub fn with_filters(mut self, filters: FilterSet<T>) -> Self {
self.filters = filters;
self
}
pub fn with_client(mut self, client: C) -> Self {
self.client = Arc::new(client);
self
}
pub fn filters(&self) -> &FilterSet<T> {
&self.filters
}
pub fn val(&self) -> &T {
&self.value
}
pub fn parent(&self) -> Option<&Storkable<T, C>> {
self.parent.as_ref().map(Arc::as_ref)
}
pub fn exec<'b>(self) -> impl futures::Stream<Item = Result<Storkable<T, C>, Error>> + 'a {
let this = Arc::new(self);
try_stream! {
let mut children = this.client.run(this.val());
while let Some(child) = children.next().await {
let child = child.context(StorkError::ClientError)?;
if !this.filters.matches(&child) {
continue;
}
yield Storkable {
value: child,
client: Arc::clone(&this.client),
filters: this.filters.clone(),
parent: Some(Arc::clone(&this)),
};
}
}
}
}
pub trait StorkClient<T>: Default {
fn run(&self, src: &T) -> Pin<Box<dyn futures::Stream<Item = Result<T, Error>>>>;
}
@@ -1,0 +1,9 @@
#[derive(Debug, Fail)]
pub enum StorkHttpError {
#[fail(display = "failed to parse url")]
UrlParseError,
#[fail(display = "failed to parse html")]
HtmlParseError,
#[fail(display = "failed to send http request")]
HttpError,
}
@@ -1,0 +1,68 @@
pub use stork::filters::FilterType;
use std::borrow::Cow;
use stork::filters::Filter;
use crate::Link;
#[derive(Debug, Clone)]
pub enum UrlFilterType {
Path(FilterType),
Domain,
Scheme,
}
#[derive(Debug, Clone)]
pub struct DomainFilter<'a>(Cow<'a, str>);
impl<'a> DomainFilter<'a> {
pub fn new<V: Into<Cow<'a, str>>>(value: V) -> Self {
Self(value.into())
}
}
impl<'a> Filter<Link> for DomainFilter<'a> {
fn matches(&self, link: &Link) -> bool {
link.url()
.host_str()
.map_or(false, |v| v == self.0.as_ref())
}
}
#[derive(Debug, Clone)]
pub struct SchemeFilter<'a>(Cow<'a, str>);
impl<'a> SchemeFilter<'a> {
pub fn new<V: Into<Cow<'a, str>>>(value: V) -> Self {
Self(value.into())
}
}
impl<'a> Filter<Link> for SchemeFilter<'a> {
fn matches(&self, link: &Link) -> bool {
link.url().scheme() == self.0.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct PathFilter<'a> {
value: Cow<'a, str>,
kind: FilterType,
}
impl<'a> PathFilter<'a> {
pub fn new<V: Into<Cow<'a, str>>>(kind: FilterType, value: V) -> Self {
Self {
kind,
value: value.into(),
}
}
}
impl<'a> Filter<Link> for PathFilter<'a> {
fn matches(&self, link: &Link) -> bool {
let url = link.url();
match &self.kind {
FilterType::StartsWith => url.path().starts_with(self.value.as_ref()),
FilterType::EndsWith => url.path().ends_with(self.value.as_ref()),
FilterType::Contains => url.path().contains(self.value.as_ref()),
FilterType::Equals => url.path() == self.value.as_ref(),
}
}
}
@@ -1,0 +1,186 @@
#![recursion_limit = "256"]
#[macro_use]
extern crate failure_derive;
mod errors;
pub mod filters;
pub use errors::StorkHttpError;
pub use url::Url;
use stork::{StorkClient, Storkable};
use std::pin::Pin;
use select::document::Document;
use select::predicate::{And, Attr, Name, Not};
use async_stream::try_stream;
use failure::Error;
use failure::ResultExt;
use std::sync::Arc;
pub use reqwest::Client as ReqwestClient;
pub type HttpStorkable = Storkable<Link, HttpStorkClient>;
#[derive(Debug)]
pub struct Link {
url: Url,
text: Option<String>,
}
impl Link {
pub fn url(&self) -> &Url {
&self.url
}
pub fn text(&self) -> Option<String> {
self.text.clone()
}
}
impl std::str::FromStr for Link {
type Err = failure::Error;
fn from_str(input: &str) -> Result<Link, Error> {
Ok(Self {
url: Url::parse(input).context(StorkHttpError::UrlParseError)?,
text: None,
})
}
}
impl From<Url> for Link {
fn from(url: Url) -> Self {
Self { url, text: None }
}
}
pub struct HttpStorkClient {
client: Arc<reqwest::Client>,
}
impl HttpStorkClient {
pub fn new(client: ReqwestClient) -> Self {
Self {
client: Arc::new(client),
}
}
}
impl Default for HttpStorkClient {
fn default() -> Self {
Self {
client: Arc::new(
reqwest::Client::builder()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
.build()
.unwrap(),
),
}
}
}
impl StorkClient<Link> for HttpStorkClient {
fn run(&self, src: &Link) -> Pin<Box<dyn futures::Stream<Item = Result<Link, Error>>>> {
let root = src.url.clone();
let client = Arc::clone(&self.client);
Box::pin(try_stream! {
let doc = client.get(root.clone())
.send().await.context(StorkHttpError::HttpError)?
.bytes().await.context(StorkHttpError::HttpError)?;
let document = Document::from_read(&doc[..]).context(StorkHttpError::HtmlParseError)?;
for node in document.find(And(Name("a"), Not(Attr("rel", "nofollow")))) {
let title = node.text().trim().to_string();
let href = node.attr("href");
if let Some(href) = href {
let href = if href.starts_with('/') || !href.contains("://") {
root.join(href).context(StorkHttpError::UrlParseError)?
} else {
Url::parse(href).context(StorkHttpError::UrlParseError)?
};
yield Link {
url: href,
text: Some(title).filter(|x| !x.is_empty())
};
}
}
})
}
}