🏡 index : ~doyle/blocks.ls.git

author Jordan Doyle <jordan@doyle.la> 2022-05-23 0:34:46.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2022-05-23 0:40:52.0 +00:00:00
commit
e5f05996db224871a1749326c18ded1b250fcb06 [patch]
tree
1261c1406e3b7f532c455d8396ebcaf22451b7a9
parent
9dc3110ee521662ce5059dfe9f467622734be489
download
e5f05996db224871a1749326c18ded1b250fcb06.tar.gz

Add extra tuning configs to the indexer



Diff

 indexer/src/main.rs | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/indexer/src/main.rs b/indexer/src/main.rs
index 44507d1..926f331 100644
--- a/indexer/src/main.rs
+++ b/indexer/src/main.rs
@@ -48,7 +48,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let start = args.start;

    let (tx, mut rx) = tokio::sync::mpsc::channel::<(u64, BlockHash, Block)>(200);
    let (tx, mut rx) = tokio::sync::mpsc::channel::<(u64, BlockHash, Block)>(args.buffer);

    let start_time = Instant::now();

@@ -63,10 +63,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                    let task: Result<_, _> = task;
                    tx.send(task.unwrap()).await.unwrap();
                }
                _ = async {}, if blocks_fetching.len() < 20 => {
                _ = async {}, if blocks_fetching.len() < args.fetch_concurrent => {
                    let bitcoin_rpc = bitcoin_rpc.clone();

                    if (height % 100) == 0 && (height - start) > 500 {
                    if (height % 100) == 0 && (height - start) > 500 && start_time.elapsed().as_secs() > 0 {
                        eprintln!("Average per tx fetched/s: {}. Current {}", (height - start) / start_time.elapsed().as_secs(), height);
                    }

@@ -91,7 +91,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                .await
                .unwrap();

            if (height % 100) == 0 && height > 500 {
            if (height % 100) == 0 && (height - start) > 500 && start_time.elapsed().as_secs() > 0 {
                eprintln!(
                    "Average processed/s: {}. Current {}",
                    height / start_time.elapsed().as_secs(),
@@ -219,6 +219,9 @@ async fn insert_transaction_input(
    transaction_id: i64,
    transaction_input: &TxIn,
) -> Result<(), Box<dyn std::error::Error>> {
    // todo: maybe we should just insert the previous output (hash, index) here and have the
    //  web-api fetch it via that index instead, that way we can parallelise our inserts to
    //  postgres
    let query = "
        INSERT INTO transaction_inputs
        (transaction_id, index, sequence, witness, script, previous_output)
@@ -296,7 +299,14 @@ pub struct Args {
    #[clap(short, long, parse(try_from_str = Config::from_toml_path))]
    pub config: Config,
    /// Block height to start at
    #[clap(short, long)]
    pub start: u64,
    /// Channel buffer between grab & push to db
    #[clap(short, long)]
    pub buffer: usize,
    /// Amount of concurrent requests to open to bitcoin rpc
    #[clap(short, long)]
    pub fetch_concurrent: usize,
}

impl Args {