From fe03f7c8e47ae3e69e00223ab7b46a2f24c3234f Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Wed, 18 May 2022 00:02:42 +0100 Subject: [PATCH] Implement previous output corroboration including address lookup --- indexer/src/database.rs | 1 - indexer/src/main.rs | 55 +++++++++++++++++++++++++++++++++++++++++-------------- migrations/up/V1__initial_schema.sql | 3 ++- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/indexer/src/database.rs b/indexer/src/database.rs index 69e1579..8e7784f 100644 --- a/indexer/src/database.rs +++ b/indexer/src/database.rs @@ -1,7 +1,6 @@ use crate::DatabaseConfig; use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, RecyclingMethod, Runtime}; use refinery::embed_migrations; -use serde::Deserialize; use std::{ops::Deref, sync::Arc}; use tokio_postgres::NoTls; diff --git a/indexer/src/main.rs b/indexer/src/main.rs index e71f86a..c96d508 100644 --- a/indexer/src/main.rs +++ b/indexer/src/main.rs @@ -1,3 +1,5 @@ +extern crate core; + mod config; mod database; @@ -11,8 +13,6 @@ use bitcoincore_rpc_async::{ }; use chrono::{TimeZone, Utc}; use clap::Parser; -use futures::stream::FuturesUnordered; -use toml::map; use tracing::Level; #[tokio::main] @@ -65,11 +65,9 @@ pub async fn process_block( let transaction_id = insert_transaction(tx, block_id, transaction).await?; futures::future::try_join( - futures::future::try_join_all(transaction.input.iter().enumerate().map( - |(index, transaction_in)| { - insert_transaction_input(tx, transaction_id, transaction_in) - }, - )), + futures::future::try_join_all(transaction.input.iter().map(|transaction_in| { + insert_transaction_input(tx, transaction_id, transaction_in) + })), futures::future::try_join_all(transaction.output.iter().enumerate().map( |(index, transaction_out)| { insert_transaction_output( @@ -160,22 +158,26 @@ async fn insert_transaction_input( transaction_id: i64, transaction_input: &TxIn, ) -> Result<(), Box> { - // TODO: this needs to deal with previous_outputs... - // transaction_output.previous_output + let previous_output = select_transaction_output( + tx, + &transaction_input.previous_output.txid.to_vec(), + transaction_input.previous_output.vout as i64, + ) + .await?; let query = " - INSERT INTO transaction_inputs + INSERT INTO transaction_outputs (transaction_id, previous_output, script, address) VALUES ($1, $2, $3, $4) "; - tx.query_one( + tx.execute( query, &[ &transaction_id, - &Option::::None, + &previous_output.as_ref().map(|(id, _)| *id), &transaction_input.script_sig.as_bytes(), - &Option::::None, + &previous_output.map(|(_, address)| address), ], ) .await?; @@ -195,7 +197,7 @@ async fn insert_transaction_output( VALUES ($1, $2, $3, $4, $5, $6) "; - tx.query_one( + tx.execute( query, &[ &transaction_id, @@ -212,6 +214,31 @@ async fn insert_transaction_output( Ok(()) } +// TODO: this is a _very_ efficient query involving just two index scans, right now we're inserting +// it alongside transaction_outputs, but we need sequential inserts for that to work. maybe we can +// just call this query on-demand? or figure out a way to sequentialise inserts - that's quite risky +// to our insert speed though. +async fn select_transaction_output( + tx: &tokio_postgres::Transaction<'_>, + transaction_hash: &[u8], + transaction_index: i64, +) -> Result, Box> { + let query = " + SELECT transaction_outputs.id AS output_id, address + FROM transactions + INNER JOIN transaction_outputs + ON transactions.id = transaction_outputs.transaction_id + WHERE transactions.hash = $1 + AND transaction_outputs.index = $2 + "; + + let row = tx + .query_opt(query, &[&transaction_hash, &transaction_index]) + .await?; + + Ok(row.map(|v| (v.get("output_id"), v.get("address")))) +} + #[derive(Parser, Debug)] #[clap(version, about, long_about = None)] pub struct Args { diff --git a/migrations/up/V1__initial_schema.sql b/migrations/up/V1__initial_schema.sql index 8be4d6b..dda7f88 100644 --- a/migrations/up/V1__initial_schema.sql +++ b/migrations/up/V1__initial_schema.sql @@ -17,7 +17,7 @@ CREATE TABLE blocks ( CREATE TABLE transactions ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - hash BYTEA NOT NULL, + hash BYTEA UNIQUE NOT NULL, block_id BIGINT, version INT NOT NULL, lock_time INT NOT NULL, @@ -42,6 +42,7 @@ CREATE TABLE transaction_outputs ( REFERENCES transactions(id) ); +CREATE INDEX transaction_outputs_txid_index ON transaction_outputs (transaction_id, index); CREATE TABLE transaction_inputs ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, -- libgit2 1.7.2