Implement previous output corroboration including address lookup
Diff
indexer/src/database.rs | 1 -
indexer/src/main.rs | 55 ++++++++++++++++++++++++++++++++++++++++++-------------
migrations/up/V1__initial_schema.sql | 3 ++-
3 files changed, 43 insertions(+), 16 deletions(-)
@@ -1,7 +1,6 @@
use crate::DatabaseConfig;
use deadpool_postgres::{Config, CreatePoolError, ManagerConfig, RecyclingMethod, Runtime};
use refinery::embed_migrations;
use serde::Deserialize;
use std::{ops::Deref, sync::Arc};
use tokio_postgres::NoTls;
@@ -1,3 +1,5 @@
extern crate core;
mod config;
mod database;
@@ -11,8 +13,6 @@
};
use chrono::{TimeZone, Utc};
use clap::Parser;
use futures::stream::FuturesUnordered;
use toml::map;
use tracing::Level;
#[tokio::main]
@@ -65,11 +65,9 @@
let transaction_id = insert_transaction(tx, block_id, transaction).await?;
futures::future::try_join(
futures::future::try_join_all(transaction.input.iter().enumerate().map(
|(index, transaction_in)| {
insert_transaction_input(tx, transaction_id, transaction_in)
},
)),
futures::future::try_join_all(transaction.input.iter().map(|transaction_in| {
insert_transaction_input(tx, transaction_id, transaction_in)
})),
futures::future::try_join_all(transaction.output.iter().enumerate().map(
|(index, transaction_out)| {
insert_transaction_output(
@@ -160,22 +158,26 @@
transaction_id: i64,
transaction_input: &TxIn,
) -> Result<(), Box<dyn std::error::Error>> {
let previous_output = select_transaction_output(
tx,
&transaction_input.previous_output.txid.to_vec(),
transaction_input.previous_output.vout as i64,
)
.await?;
let query = "
INSERT INTO transaction_inputs
INSERT INTO transaction_outputs
(transaction_id, previous_output, script, address)
VALUES ($1, $2, $3, $4)
";
tx.query_one(
tx.execute(
query,
&[
&transaction_id,
&Option::<i64>::None,
&previous_output.as_ref().map(|(id, _)| *id),
&transaction_input.script_sig.as_bytes(),
&Option::<String>::None,
&previous_output.map(|(_, address)| address),
],
)
.await?;
@@ -195,7 +197,7 @@
VALUES ($1, $2, $3, $4, $5, $6)
";
tx.query_one(
tx.execute(
query,
&[
&transaction_id,
@@ -210,6 +212,31 @@
.await?;
Ok(())
}
async fn select_transaction_output(
tx: &tokio_postgres::Transaction<'_>,
transaction_hash: &[u8],
transaction_index: i64,
) -> Result<Option<(i64, String)>, Box<dyn std::error::Error>> {
let query = "
SELECT transaction_outputs.id AS output_id, address
FROM transactions
INNER JOIN transaction_outputs
ON transactions.id = transaction_outputs.transaction_id
WHERE transactions.hash = $1
AND transaction_outputs.index = $2
";
let row = tx
.query_opt(query, &[&transaction_hash, &transaction_index])
.await?;
Ok(row.map(|v| (v.get("output_id"), v.get("address"))))
}
#[derive(Parser, Debug)]
@@ -17,7 +17,7 @@
CREATE TABLE transactions (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
hash BYTEA NOT NULL,
hash BYTEA UNIQUE NOT NULL,
block_id BIGINT,
version INT NOT NULL,
lock_time INT NOT NULL,
@@ -42,6 +42,7 @@
REFERENCES transactions(id)
);
CREATE INDEX transaction_outputs_txid_index ON transaction_outputs (transaction_id, index);
CREATE TABLE transaction_inputs (
id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,