Write in batches to RocksDB
Diff
Cargo.lock | 16 ++++++++++++++++
Cargo.toml | 1 +
src/database/indexer.rs | 49 ++++++++++++++++++++++++++++++++++++++-----------
src/database/schema/commit.rs | 18 ++++++++++--------
4 files changed, 56 insertions(+), 28 deletions(-)
@@ -771,6 +771,12 @@
]
[[package]]
name = "either"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07"
[[package]]
name = "encode_unicode"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1184,6 +1190,15 @@
dependencies = [
"equivalent",
"hashbrown",
]
[[package]]
name = "itertools"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0"
dependencies = [
"either",
]
[[package]]
@@ -1905,6 +1920,7 @@
"hex",
"httparse",
"humantime",
"itertools",
"md5",
"moka",
"nom",
@@ -22,6 +22,7 @@
git2 = "0.18.0"
hex = "0.4"
humantime = "2.1"
itertools = "0.12"
rust-ini = "0.20"
nom = "7.1"
md5 = "0.7"
@@ -10,6 +10,8 @@
use anyhow::Context;
use git2::{ErrorCode, Reference, Sort};
use ini::Ini;
use itertools::Itertools;
use rocksdb::WriteBatch;
use time::OffsetDateTime;
use tracing::{error, info, info_span, instrument, warn};
@@ -212,29 +214,40 @@
let tree_len = commit_tree.len()?;
let mut seen = false;
let mut i = 0;
for rev in revwalk {
let rev = rev?;
if let (false, Some(latest_indexed)) = (seen, &latest_indexed) {
if rev.as_bytes() == &*latest_indexed.get().hash {
seen = true;
for revs in &revwalk.chunks(250) {
let mut batch = WriteBatch::default();
for rev in revs {
let rev = rev?;
if let (false, Some(latest_indexed)) = (seen, &latest_indexed) {
if rev.as_bytes() == &*latest_indexed.get().hash {
seen = true;
}
continue;
}
continue;
}
seen = true;
seen = true;
if ((i + 1) % 25_000) == 0 {
info!("{} commits ingested", i + 1);
}
if ((i + 1) % 25_000) == 0 {
info!("{} commits ingested", i + 1);
let commit = git_repository.find_commit(rev)?;
let author = commit.author();
let committer = commit.committer();
Commit::new(&commit, &author, &committer).insert(
&commit_tree,
tree_len + i,
&mut batch,
)?;
i += 1;
}
let commit = git_repository.find_commit(rev)?;
let author = commit.author();
let committer = commit.committer();
Commit::new(&commit, &author, &committer).insert(&commit_tree, tree_len + i)?;
i += 1;
commit_tree.update_counter(tree_len + i, &mut batch)?;
db.write_without_wal(batch)?;
}
if !seen && !force_reindex {
@@ -250,8 +263,6 @@
true,
);
}
commit_tree.update_counter(tree_len + i)?;
Ok(())
}
@@ -1,8 +1,8 @@
use std::{borrow::Cow, ops::Deref, sync::Arc};
use anyhow::Context;
use git2::{Oid, Signature};
use rocksdb::{IteratorMode, ReadOptions};
use rocksdb::{IteratorMode, ReadOptions, WriteBatch};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use time::OffsetDateTime;
use tracing::debug;
@@ -44,8 +44,8 @@
}
}
pub fn insert(&self, batch: &CommitTree, id: u64) -> anyhow::Result<()> {
batch.insert(id, self)
pub fn insert(&self, tree: &CommitTree, id: u64, tx: &mut WriteBatch) -> anyhow::Result<()> {
tree.insert(id, self, tx)
}
}
@@ -72,8 +72,8 @@
S: Serializer,
{
match self {
CommitHash::Oid(v) => v.as_bytes().serialize(serializer),
CommitHash::Bytes(v) => v.serialize(serializer),
CommitHash::Oid(v) => serializer.serialize_bytes(v.as_bytes()),
CommitHash::Bytes(v) => serializer.serialize_bytes(v),
}
}
}
@@ -147,13 +147,13 @@
Ok(())
}
pub fn update_counter(&self, count: u64) -> anyhow::Result<()> {
pub fn update_counter(&self, count: u64, tx: &mut WriteBatch) -> anyhow::Result<()> {
let cf = self
.db
.cf_handle(COMMIT_COUNT_FAMILY)
.context("missing column family")?;
self.db.put_cf(cf, &self.prefix, count.to_be_bytes())?;
tx.put_cf(cf, &self.prefix, count.to_be_bytes());
Ok(())
}
@@ -173,7 +173,7 @@
Ok(u64::from_be_bytes(out))
}
fn insert(&self, id: u64, commit: &Commit<'_>) -> anyhow::Result<()> {
fn insert(&self, id: u64, commit: &Commit<'_>, tx: &mut WriteBatch) -> anyhow::Result<()> {
let cf = self
.db
.cf_handle(COMMIT_FAMILY)
@@ -182,7 +182,7 @@
let mut key = self.prefix.to_vec();
key.extend_from_slice(&id.to_be_bytes());
self.db.put_cf(cf, key, bincode::serialize(commit)?)?;
tx.put_cf(cf, key, bincode::serialize(commit)?);
Ok(())
}