From 137b6fab39e49e00ac7b7ff5c3c7fa0567faf75c Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Sat, 15 Feb 2020 16:18:45 +0000 Subject: [PATCH] storkcli: Process streams from stork async instead of generating a hierarchy synchronously I'm sure this'll come back in a future update but for now this massively speeds up link gathering using storkcli. --- storkcli/src/main.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 52 insertions(+), 24 deletions(-) diff --git a/storkcli/src/main.rs b/storkcli/src/main.rs index 90895ad..cd33327 100644 --- a/storkcli/src/main.rs +++ b/storkcli/src/main.rs @@ -1,6 +1,11 @@ +use std::hash::{Hash, Hasher}; + use futures::{pin_mut, StreamExt}; -use std::collections::VecDeque; -use stork_http::{HttpStorkable, Link}; + +use failure::Fallible; + +use stork::FilterSet; +use stork_http::{filters::*, HttpStorkable, Link}; #[derive(argh::FromArgs)] /// Link hunter with a little bit of magic. @@ -11,31 +16,52 @@ struct Args { /// follow. max_depth: Option, + #[argh(switch, short = 'o')] + /// only grab links from the same origin, useful for creating + /// sitemaps + same_origin: bool, + #[argh(positional)] url: Link, } +fn make_tuple_fn( + depth: usize, +) -> impl Fn(failure::Fallible) -> (Fallible, usize) { + move |v| (v, depth) +} + #[tokio::main] async fn main() -> failure::Fallible<()> { let args: Args = argh::from_env(); let url = args.url; - let stream = HttpStorkable::new(url).exec(); - pin_mut!(stream); // needed for iteration + let mut filters = FilterSet::default(); + if args.same_origin { + filters = filters.add_filter(DomainFilter::new(url.url().host().unwrap().to_string())); + } - let mut queue = stream.map(|v| (v, 0)).collect::>().await; + let queue = futures::stream::SelectAll::new(); + pin_mut!(queue); - if queue.is_empty() { - panic!("Failed to find any links on the page!"); - } + // push the initial Storkable onto the queue + queue.push(Box::pin( + HttpStorkable::new(url) + .with_filters(filters) + .exec() + .map(make_tuple_fn(0)), + )); + + let mut seen = Vec::new(); - // TODO: this is very synchronous at the moment loop { - if queue.is_empty() { + let value = queue.next().await; + + if value.is_none() { break; } - let (link, depth) = queue.pop_front().unwrap(); + let (link, depth) = value.unwrap(); if let Err(e) = link { eprintln!("Failed to grab a link: {}", e); @@ -44,7 +70,20 @@ async fn main() -> failure::Fallible<()> { let link = link.unwrap(); - println!("{}↳ {}", " ".repeat(depth), link.val().url()); + // TODO: see if we can do this in a filter before we even make + // TODO: it into the synchronous print loop + let hash = { + let mut hash = twox_hash::XxHash64::default(); + link.val().hash(&mut hash); + hash.finish() + }; + if seen.contains(&hash) { + continue; + } else { + seen.push(hash); + } + + println!("{}", link.val().url()); if let Some(max_depth) = args.max_depth { if depth >= max_depth { @@ -54,18 +93,7 @@ async fn main() -> failure::Fallible<()> { // add children of this storkable to the front of the queue with // 1 depth added on - let children = link.exec(); - pin_mut!(children); - - while let Some(v) = children.next().await { - queue.push_front((v, depth + 1)); - } - - // TODO: get the library returning futures 0.3 asap - // link.exec() - // .map(|v| (v, depth + 1)) - // .for_each(|v| queue.push_front(v)) - // .await; + queue.push(Box::pin(link.exec().map(make_tuple_fn(depth + 1)))); } Ok(()) -- libgit2 1.7.2