🏡 index : ~doyle/blocks.ls.git

author Jordan Doyle <jordan@doyle.la> 2022-05-17 23:02:42.0 +00:00:00
committer Jordan Doyle <jordan@doyle.la> 2022-05-17 23:02:42.0 +00:00:00
commit
fe03f7c8e47ae3e69e00223ab7b46a2f24c3234f [patch]
tree
8226561e29b89c4345bd55397dfd7485d8135ffb
parent
8b0ceff5d6444f4cd6f8401d0ac26bdc5d5c6a93
download
fe03f7c8e47ae3e69e00223ab7b46a2f24c3234f.tar.gz

Implement previous output corroboration including address lookup



Diff

 indexer/src/database.rs              |  1 +-
 indexer/src/main.rs                  | 55 +++++++++++++++++++++++++++----------
 migrations/up/V1__initial_schema.sql |  3 +-
 3 files changed, 43 insertions(+), 16 deletions(-)

diff --git a/indexer/src/database.rs b/indexer/src/database.rs
index 69e1579..8e7784f 100644
--- a/indexer/src/database.rs
+++ b/indexer/src/database.rs
@@ -1,7 +1,6 @@
use crate::DatabaseConfig;
use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, RecyclingMethod, Runtime};
use refinery::embed_migrations;
use serde::Deserialize;
use std::{ops::Deref, sync::Arc};
use tokio_postgres::NoTls;

diff --git a/indexer/src/main.rs b/indexer/src/main.rs
index e71f86a..c96d508 100644
--- a/indexer/src/main.rs
+++ b/indexer/src/main.rs
@@ -1,3 +1,5 @@
extern crate core;

mod config;
mod database;

@@ -11,8 +13,6 @@ use bitcoincore_rpc_async::{
};
use chrono::{TimeZone, Utc};
use clap::Parser;
use futures::stream::FuturesUnordered;
use toml::map;
use tracing::Level;

#[tokio::main]
@@ -65,11 +65,9 @@ pub async fn process_block(
                let transaction_id = insert_transaction(tx, block_id, transaction).await?;

                futures::future::try_join(
                    futures::future::try_join_all(transaction.input.iter().enumerate().map(
                        |(index, transaction_in)| {
                            insert_transaction_input(tx, transaction_id, transaction_in)
                        },
                    )),
                    futures::future::try_join_all(transaction.input.iter().map(|transaction_in| {
                        insert_transaction_input(tx, transaction_id, transaction_in)
                    })),
                    futures::future::try_join_all(transaction.output.iter().enumerate().map(
                        |(index, transaction_out)| {
                            insert_transaction_output(
@@ -160,22 +158,26 @@ async fn insert_transaction_input(
    transaction_id: i64,
    transaction_input: &TxIn,
) -> Result<(), Box<dyn std::error::Error>> {
    // TODO: this needs to deal with previous_outputs...
    // transaction_output.previous_output
    let previous_output = select_transaction_output(
        tx,
        &transaction_input.previous_output.txid.to_vec(),
        transaction_input.previous_output.vout as i64,
    )
    .await?;

    let query = "
        INSERT INTO transaction_inputs
        INSERT INTO transaction_outputs
        (transaction_id, previous_output, script, address)
        VALUES ($1, $2, $3, $4)
    ";

    tx.query_one(
    tx.execute(
        query,
        &[
            &transaction_id,
            &Option::<i64>::None,
            &previous_output.as_ref().map(|(id, _)| *id),
            &transaction_input.script_sig.as_bytes(),
            &Option::<String>::None,
            &previous_output.map(|(_, address)| address),
        ],
    )
    .await?;
@@ -195,7 +197,7 @@ async fn insert_transaction_output(
        VALUES ($1, $2, $3, $4, $5, $6)
    ";

    tx.query_one(
    tx.execute(
        query,
        &[
            &transaction_id,
@@ -212,6 +214,31 @@ async fn insert_transaction_output(
    Ok(())
}

// TODO: this is a _very_ efficient query involving just two index scans, right now we're inserting
//  it alongside transaction_outputs, but we need sequential inserts for that to work. maybe we can
//  just call this query on-demand? or figure out a way to sequentialise inserts - that's quite risky
//  to our insert speed though.
async fn select_transaction_output(
    tx: &tokio_postgres::Transaction<'_>,
    transaction_hash: &[u8],
    transaction_index: i64,
) -> Result<Option<(i64, String)>, Box<dyn std::error::Error>> {
    let query = "
        SELECT transaction_outputs.id AS output_id, address
        FROM transactions
        INNER JOIN transaction_outputs
            ON transactions.id = transaction_outputs.transaction_id
        WHERE transactions.hash = $1
            AND transaction_outputs.index = $2
    ";

    let row = tx
        .query_opt(query, &[&transaction_hash, &transaction_index])
        .await?;

    Ok(row.map(|v| (v.get("output_id"), v.get("address"))))
}

#[derive(Parser, Debug)]
#[clap(version, about, long_about = None)]
pub struct Args {
diff --git a/migrations/up/V1__initial_schema.sql b/migrations/up/V1__initial_schema.sql
index 8be4d6b..dda7f88 100644
--- a/migrations/up/V1__initial_schema.sql
+++ b/migrations/up/V1__initial_schema.sql
@@ -17,7 +17,7 @@ CREATE TABLE blocks (

CREATE TABLE transactions (
    id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
    hash BYTEA NOT NULL,
    hash BYTEA UNIQUE NOT NULL,
    block_id BIGINT,
    version INT NOT NULL,
    lock_time INT NOT NULL,
@@ -42,6 +42,7 @@ CREATE TABLE transaction_outputs (
            REFERENCES transactions(id)
);

CREATE INDEX transaction_outputs_txid_index ON transaction_outputs (transaction_id, index);

CREATE TABLE transaction_inputs (
    id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,