Skip to content

Commit

Permalink
make database more roboust
Browse files Browse the repository at this point in the history
  • Loading branch information
Davidson-Souza committed Nov 17, 2023
1 parent 9c7e01b commit 83d736d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 21 deletions.
60 changes: 50 additions & 10 deletions crates/floresta-chain/src/pruned_utreexo/chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use bitcoin::{
#[cfg(feature = "bitcoinconsensus")]
use core::ffi::c_uint;
use floresta_common::Channel;
use log::{info, trace};
use log::{info, trace, warn};
use rustreexo::accumulator::{node_hash::NodeHash, proof::Proof, stump::Stump};
use spin::RwLock;

Expand Down Expand Up @@ -523,21 +523,38 @@ impl<PersistedState: ChainStore> ChainState<PersistedState> {
/// point.
fn reindex_chain(&self) -> BestChain {
let inner = read_lock!(self);
warn!("reindexing our chain");
let mut headers = 0;
let mut fully_valid = inner
.chainstore
.get_block_hash(0)
.expect("No genesis block")
.unwrap();

let mut height = 0;
let mut hash = inner
.chainstore
.get_block_hash(0)
.expect("No genesis block")
.unwrap();
while let Ok(Some(_hash)) = inner.chainstore.get_block_hash(height) {
height += 1;
while let Ok(Some(_hash)) = inner.chainstore.get_block_hash(headers) {
let header = self.get_disk_block_header(&_hash);
match &header {
Ok(DiskBlockHeader::FullyValid(header, heigth)) => {
assert_eq!(*heigth, headers);
headers += 1;
fully_valid = header.block_hash();
}
Ok(DiskBlockHeader::HeadersOnly(_, _)) => {
headers += 1;
}
_ => break,
}
hash = _hash;
}
BestChain {
best_block: hash,
depth: height,
validation_index: hash,
depth: headers - 1,
validation_index: fully_valid,
rescan_index: None,
alternative_tips: Vec::new(),
assume_valid_index: 0,
Expand Down Expand Up @@ -575,10 +592,28 @@ impl<PersistedState: ChainStore> ChainState<PersistedState> {
inner: RwLock::new(inner),
};
// Check the integrity of our chain
chainstate.reindex_chain();
chainstate.check_chain_integrity();
Ok(chainstate)
}

fn check_chain_integrity(&self) {
let best_height = self.get_best_block().expect("should have this loaded").0;
for height in 0..=best_height {
let Ok(hash) = self.get_block_hash(height) else {
self.reindex_chain();
return;
};
match self.get_disk_block_header(&hash) {
Ok(DiskBlockHeader::FullyValid(_, _)) => continue,
Ok(DiskBlockHeader::HeadersOnly(_, _)) => continue,

_ => {
warn!("our chain is corrupted, reindexing");
self.reindex_chain();
}
}
}
}
fn load_acc<Storage: ChainStore>(data_storage: &Storage) -> Stump {
let acc = data_storage
.load_roots()
Expand Down Expand Up @@ -615,7 +650,6 @@ impl<PersistedState: ChainStore> ChainState<PersistedState> {
inner
.chainstore
.update_block_index(height, block.block_hash())?;
inner.chainstore.save_height(&inner.best_block)?;
// Updates our local view of the network
inner.acc = acc;
inner.best_block.valid_block(block.block_hash());
Expand Down Expand Up @@ -910,16 +944,19 @@ impl<PersistedState: ChainStore> UpdatableChainstate for ChainState<PersistedSta
block.block_hash()
);
self.flush()?;
self.save_acc()?;
}
if !ibd {
info!(
"New tip! hash={} height={height} tx_count={}",
block.block_hash(),
block.txdata.len()
)
);
self.flush()?;
self.save_acc()?;
}

self.update_view(height, &block.header, acc)?;
self.save_acc()?;

// Notify others we have a new block
self.notify(block, height);
Expand All @@ -945,9 +982,12 @@ impl<PersistedState: ChainStore> UpdatableChainstate for ChainState<PersistedSta
self.maybe_reindex(&_header?);
return Ok(()); // We already have this header
}
// The best block we know of
let best_block = self.get_best_block()?;

// Do validation in this header
let block_hash = self.validate_header(&header)?;

// Update our current tip
if header.prev_blockhash == best_block.1 {
let height = best_block.0 + 1;
Expand Down
16 changes: 10 additions & 6 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,10 +772,14 @@ where
for header in headers {
self.chain.accept_header(header)?;
}
if self.inflight.contains_key(&InflightRequests::Headers) {
return Ok(());
}
let locator = self.chain.get_block_locator()?;
let peer = self
.send_to_random_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE)
.await?;

self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));
Ok(())
Expand All @@ -796,22 +800,21 @@ where
}
pub async fn run(&mut self, stop_signal: &Arc<RwLock<bool>>) -> Result<(), WireError> {
self.create_connection(false).await;
self.last_headers_request = Instant::now();
loop {
while let Ok(notification) =
timeout(Duration::from_millis(10), self.node_rx.recv()).await
{
try_and_log!(self.handle_notification(notification).await);
}

if *stop_signal.read().await {
break;
}
periodic_job!(
self.maybe_open_connection().await,
self.last_connection,
TRY_NEW_CONNECTION,
IBDNode
);

if self.state == NodeState::WaitingPeer {
try_and_log!(self.maybe_open_connection().await);
}
self.last_tip_update = Instant::now();

// If we don't have any peers, then we can't do anything
Expand All @@ -833,6 +836,7 @@ where
}
})?;
}

if !self.chain.is_in_idb() {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/floresta-wire/src/p2p_wire/node_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub trait NodeContext {
/// If ASSUME_STALE seconds passed since our last tip update, treat it as stale
const ASSUME_STALE: u64 = 30 * 60; // 30 minutes
/// While on IBD, if we've been without blocks for this long, ask for headers again
const IBD_REQUEST_BLOCKS_AGAIN: u64 = 10; // 10 seconds
const IBD_REQUEST_BLOCKS_AGAIN: u64 = 30; // 30 seconds
/// How often we broadcast transactions
const BROADCAST_DELAY: u64 = 30; // 30 seconds
/// Max number of simultaneous inflight requests we allow
Expand Down
7 changes: 3 additions & 4 deletions crates/floresta-wire/src/p2p_wire/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use bitcoin::{
BlockHash, BlockHeader, Network, Transaction,
};
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt};
use log::{debug, error, info, warn};
use log::{debug, error,warn};
use std::{
fmt::Debug,
sync::Arc,
Expand Down Expand Up @@ -306,7 +306,7 @@ impl<T: Transport> Peer<T> {
.await;
}
_ => {
info!("Unexpected message: {:?}", message.payload);
warn!("unexpected message: {:?} from peer {}", message.payload, self.id);
return Err(PeerError::UnexpectedMessage);
}
},
Expand All @@ -322,8 +322,7 @@ impl<T: Transport> Peer<T> {
}
bitcoin::network::message::NetworkMessage::WtxidRelay => {}
_ => {
info!("==>Unexpected message: {:?}", message.payload);

warn!("unexpected message: {:?} from peer {}", message.payload, self.id);
return Err(PeerError::UnexpectedMessage);
}
},
Expand Down

0 comments on commit 83d736d

Please sign in to comment.