From d5e06c56fc5a3b9323382927dce9db3df20a0191 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Wed, 17 Jan 2024 22:57:22 +0000 Subject: [PATCH] Write in batches to RocksDB --- Cargo.lock | 16 ++++++++++++++++ Cargo.toml | 1 + src/database/indexer.rs | 49 ++++++++++++++++++++++++++++++++++++++----------- src/database/schema/commit.rs | 18 ++++++++++-------- 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91ec96e..fa82650 100644 --- a/Cargo.lock +++ a/Cargo.lock @@ -771,6 +771,12 @@ ] [[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + +[[package]] name = "encode_unicode" version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1184,6 +1190,15 @@ dependencies = [ "equivalent", "hashbrown", +] + +[[package]] +name = "itertools" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25db6b064527c5d482d0423354fcd07a89a2dfe07b67892e62411946db7f07b0" +dependencies = [ + "either", ] [[package]] @@ -1905,6 +1920,7 @@ "hex", "httparse", "humantime", + "itertools", "md5", "moka", "nom", diff --git a/Cargo.toml b/Cargo.toml index 567fcfd..84987cd 100644 --- a/Cargo.toml +++ a/Cargo.toml @@ -22,6 +22,7 @@ git2 = "0.18.0" hex = "0.4" humantime = "2.1" +itertools = "0.12" rust-ini = "0.20" nom = "7.1" md5 = "0.7" diff --git a/src/database/indexer.rs b/src/database/indexer.rs index 92d9532..3469009 100644 --- a/src/database/indexer.rs +++ a/src/database/indexer.rs @@ -10,6 +10,8 @@ use anyhow::Context; use git2::{ErrorCode, Reference, Sort}; use ini::Ini; +use itertools::Itertools; +use rocksdb::WriteBatch; use time::OffsetDateTime; use tracing::{error, info, info_span, instrument, warn}; @@ -212,29 +214,40 @@ let tree_len = commit_tree.len()?; let mut seen = false; let mut i = 0; - for rev in revwalk { - let rev = rev?; - - if let (false, Some(latest_indexed)) = (seen, &latest_indexed) { - if rev.as_bytes() == &*latest_indexed.get().hash { - seen = true; + for revs in &revwalk.chunks(250) { + let mut batch = WriteBatch::default(); + + for rev in revs { + let rev = rev?; + + if let (false, Some(latest_indexed)) = (seen, &latest_indexed) { + if rev.as_bytes() == &*latest_indexed.get().hash { + seen = true; + } + + continue; } - continue; - } + seen = true; - seen = true; + if ((i + 1) % 25_000) == 0 { + info!("{} commits ingested", i + 1); + } - if ((i + 1) % 25_000) == 0 { - info!("{} commits ingested", i + 1); + let commit = git_repository.find_commit(rev)?; + let author = commit.author(); + let committer = commit.committer(); + + Commit::new(&commit, &author, &committer).insert( + &commit_tree, + tree_len + i, + &mut batch, + )?; + i += 1; } - let commit = git_repository.find_commit(rev)?; - let author = commit.author(); - let committer = commit.committer(); - - Commit::new(&commit, &author, &committer).insert(&commit_tree, tree_len + i)?; - i += 1; + commit_tree.update_counter(tree_len + i, &mut batch)?; + db.write_without_wal(batch)?; } if !seen && !force_reindex { @@ -250,8 +263,6 @@ true, ); } - - commit_tree.update_counter(tree_len + i)?; Ok(()) } diff --git a/src/database/schema/commit.rs b/src/database/schema/commit.rs index 4c60a2e..98b56a8 100644 --- a/src/database/schema/commit.rs +++ a/src/database/schema/commit.rs @@ -1,8 +1,8 @@ use std::{borrow::Cow, ops::Deref, sync::Arc}; use anyhow::Context; use git2::{Oid, Signature}; -use rocksdb::{IteratorMode, ReadOptions}; +use rocksdb::{IteratorMode, ReadOptions, WriteBatch}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use time::OffsetDateTime; use tracing::debug; @@ -44,8 +44,8 @@ } } - pub fn insert(&self, batch: &CommitTree, id: u64) -> anyhow::Result<()> { - batch.insert(id, self) + pub fn insert(&self, tree: &CommitTree, id: u64, tx: &mut WriteBatch) -> anyhow::Result<()> { + tree.insert(id, self, tx) } } @@ -72,8 +72,8 @@ S: Serializer, { match self { - CommitHash::Oid(v) => v.as_bytes().serialize(serializer), - CommitHash::Bytes(v) => v.serialize(serializer), + CommitHash::Oid(v) => serializer.serialize_bytes(v.as_bytes()), + CommitHash::Bytes(v) => serializer.serialize_bytes(v), } } } @@ -147,13 +147,13 @@ Ok(()) } - pub fn update_counter(&self, count: u64) -> anyhow::Result<()> { + pub fn update_counter(&self, count: u64, tx: &mut WriteBatch) -> anyhow::Result<()> { let cf = self .db .cf_handle(COMMIT_COUNT_FAMILY) .context("missing column family")?; - self.db.put_cf(cf, &self.prefix, count.to_be_bytes())?; + tx.put_cf(cf, &self.prefix, count.to_be_bytes()); Ok(()) } @@ -173,7 +173,7 @@ Ok(u64::from_be_bytes(out)) } - fn insert(&self, id: u64, commit: &Commit<'_>) -> anyhow::Result<()> { + fn insert(&self, id: u64, commit: &Commit<'_>, tx: &mut WriteBatch) -> anyhow::Result<()> { let cf = self .db .cf_handle(COMMIT_FAMILY) @@ -182,7 +182,7 @@ let mut key = self.prefix.to_vec(); key.extend_from_slice(&id.to_be_bytes()); - self.db.put_cf(cf, key, bincode::serialize(commit)?)?; + tx.put_cf(cf, key, bincode::serialize(commit)?); Ok(()) } -- rgit 0.1.3