Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves reorg logic by checking whether we are on sync with the backend or not #235

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 52 additions & 12 deletions teos/src/carrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,23 @@ impl Carrier {
ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_ERROR)
}
rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN => {
log::info!(
"Transaction was confirmed long ago, not keeping track of it: {}",
tx.txid()
);

// Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by
// checking whether the transaction id can be found in the [Responder]'s [TxIndex], meaning that if the transaction bounces it was confirmed long
// ago (> IRREVOCABLY_RESOLVED), so we don't need to worry about it.
ConfirmationStatus::IrrevocablyResolved
if self.bitcoin_cli.get_block_count().unwrap() as u32 > self.block_height {
// We are out of sync, either we are trying to send things to bitcoind after a reorg (and we have not reached the new tip yet)
// or a block was found right when we were trying to send something and we have not yet processed it.
// In both cases we don't know if what we are trying to send is really old (IRREVOCABLY_RESOLVED) or if it'll just be processed
// in our way up to the new tip (we need to make this work both for txindex and prune mode, so getrawtransaction is not an option)
ConfirmationStatus::OffSync
} else {
log::info!(
"Transaction was confirmed long ago, not keeping track of it: {}",
tx.txid()
);

// Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by
// checking whether the transaction id can be found in the [Responder]'s [TxIndex], and we know we are on sync, so he transaction
// must have confirmed a long ago (> IRREVOCABLY_RESOLVED). Therefore, we don't need to worry about it anymore.
ConfirmationStatus::IrrevocablyResolved
}
}
rpc_errors::RPC_DESERIALIZATION_ERROR => {
// Adding this here just for completeness. We should never end up here. The Carrier only sends txs handed by the Responder,
Expand Down Expand Up @@ -311,12 +319,44 @@ mod tests {

#[test]
fn test_send_transaction_verify_already_in_chain() {
let bitcoind_mock = BitcoindMock::new(MockOptions::with_error(
rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64,
));
let start_height = START_HEIGHT as u32;
// Set the backend to be one block ahead of us
let bitcoind_mock = BitcoindMock::new(
MockOptions::with_error(rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64)
.at_height(start_height + 1),
);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
start_server(bitcoind_mock.server);

let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
let r = carrier.send_transaction(&tx);

// We are offsync, so the transaction should bounce but tell us about it
assert_eq!(r, ConfirmationStatus::OffSync);
assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r);

// Try again, but this time being onsync, now we should get an IrrevocablyResolved
// We first need to clear the issued_receipts
carrier.issued_receipts.remove(&tx.txid());
// And either increase our height
carrier.block_height += 1;

let r = carrier.send_transaction(&tx);
assert_eq!(r, ConfirmationStatus::IrrevocablyResolved);
assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r);
}

#[test]
fn test_send_transaction_verify_already_in_chain_offsync() {
let start_height = START_HEIGHT as u32;
let bitcoind_mock = BitcoindMock::new(
MockOptions::with_error(rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64)
.at_height(start_height),
);
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
start_server(bitcoind_mock.server);

let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
Expand Down
43 changes: 31 additions & 12 deletions teos/src/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub enum ConfirmationStatus {
ConfirmedIn(u32),
InMempoolSince(u32),
IrrevocablyResolved,
OffSync,
Rejected(i32),
ReorgedOut,
}
Expand Down Expand Up @@ -70,6 +71,16 @@ impl ConfirmationStatus {
ConfirmationStatus::ConfirmedIn(_) | &ConfirmationStatus::InMempoolSince(_)
)
}

/// Whether the transaction was rejected
pub fn rejected(&self) -> bool {
matches!(self, ConfirmationStatus::Rejected(_))
}

/// Whether the transaction couldn't be processed because we are off sync.
pub fn off_sync(&self) -> bool {
matches!(self, ConfirmationStatus::OffSync)
}
}

/// Minimal data required in memory to keep track of transaction trackers.
Expand Down Expand Up @@ -222,7 +233,7 @@ impl Responder {
carrier.send_transaction(&breach.penalty_tx)
};

if status.accepted() {
if status.accepted() || status.off_sync() {
self.add_tracker(uuid, breach, user_id, status);
}

Expand Down Expand Up @@ -348,17 +359,19 @@ impl Responder {
) -> HashMap<UUID, (Transaction, Option<Transaction>)> {
let dbm = self.dbm.lock().unwrap();
let mut tx_to_rebroadcast = HashMap::new();
let mut tracker: TransactionTracker;

for (uuid, t) in self.trackers.lock().unwrap().iter() {
if let ConfirmationStatus::InMempoolSince(h) = t.status {
if (height - h) as u8 >= CONFIRMATIONS_BEFORE_RETRY {
tracker = dbm.load_tracker(*uuid).unwrap();
tx_to_rebroadcast.insert(*uuid, (tracker.penalty_tx, None));
tx_to_rebroadcast
.insert(*uuid, (dbm.load_tracker(*uuid).unwrap().penalty_tx, None));
}
} else if let ConfirmationStatus::ReorgedOut = t.status {
tracker = dbm.load_tracker(*uuid).unwrap();
let tracker = dbm.load_tracker(*uuid).unwrap();
tx_to_rebroadcast.insert(*uuid, (tracker.penalty_tx, Some(tracker.dispute_tx)));
} else if t.status.off_sync() {
tx_to_rebroadcast
.insert(*uuid, (dbm.load_tracker(*uuid).unwrap().penalty_tx, None));
}
}

Expand Down Expand Up @@ -413,13 +426,15 @@ impl Responder {
if tx_index.contains_key(&dispute_tx.txid())
| carrier.in_mempool(&dispute_tx.txid())
{
// Dispute tx is on chain (or mempool), so we only need to care about the penalty
// Dispute tx is on chain (or mempool), so we only need to care about the penalty.
// We know for a fact that the penalty is not in the index, because otherwise it would have been received
// a confirmation during the processing of the current block (hence it would not have been passed to this method)
carrier.send_transaction(&penalty_tx)
} else {
// Dispute tx has also been reorged out, meaning that both transactions need to be broadcast.
// DISCUSS: For lightning transactions, if the dispute has been reorged the penalty cannot make it to the network.
// If we keep this general, the dispute can simply be a trigger and the penalty doesn't necessarily have to spend from it.
// We'll keel it lightning specific, at least for now.
// We'll keep it lightning specific, at least for now.
let status = carrier.send_transaction(&dispute_tx);
if let ConfirmationStatus::Rejected(e) = status {
log::error!(
Expand All @@ -428,14 +443,15 @@ impl Responder {
);
status
} else {
// The dispute was accepted, so we can rebroadcast the penalty.
// If the dispute is not rejected we can send the penalty.
// Notice this covers both the dispute being accepted or bouncing, given bouncing will mean it is already on chain.
carrier.send_transaction(&penalty_tx)
}
}
} else {
// The tracker has simply reached CONFIRMATIONS_BEFORE_RETRY missed confirmations.
log::warn!(
"Penalty transaction has missed many confirmations: {}",
"Penalty transaction has missed many confirmations or was sent while we were off-sync with the backend: {}",
penalty_tx.txid()
);
carrier.send_transaction(&penalty_tx)
Expand All @@ -444,11 +460,14 @@ impl Responder {
if let ConfirmationStatus::Rejected(_) = status {
rejected.insert(uuid);
} else {
// Update the tracker if it gets accepted. This will also update the height (since when we are counting the tracker
// to have been in mempool), so it resets the wait period instead of trying to rebroadcast every block.
// Update the status if the tracker is not rejected. This will update the height for InMempooolSince, resetting the
// missed confirmation counter, or flag it as OffSync if we happen to not be on sync.
// DISCUSS: We may want to find another approach in the future for the InMempoool transactions.
trackers.get_mut(&uuid).unwrap().status = status;
accepted.insert(uuid, status);

if status.accepted() {
accepted.insert(uuid, status);
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions teos/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,30 +538,39 @@ pub(crate) struct BitcoindMock {

#[derive(Default)]
pub(crate) struct MockOptions {
height: u32,
error_code: Option<i64>,
in_mempool: bool,
}

impl MockOptions {
pub fn with_error(error_code: i64) -> Self {
Self {
height: 0,
error_code: Some(error_code),
in_mempool: false,
}
}

pub fn in_mempool() -> Self {
Self {
height: 0,
error_code: None,
in_mempool: true,
}
}

pub fn at_height(self, h: u32) -> Self {
Self { height: h, ..self }
}
}

impl BitcoindMock {
pub fn new(options: MockOptions) -> Self {
let mut io = IoHandler::default();

BitcoindMock::add_getblockcount(&mut io, options.height);

if let Some(error) = options.error_code {
io.add_sync_method("error", move |_params: Params| {
Err(JsonRpcError::new(JsonRpcErrorCode::ServerError(error)))
Expand Down Expand Up @@ -614,6 +623,12 @@ impl BitcoindMock {
})
}

fn add_getblockcount(io: &mut IoHandler, h: u32) {
io.add_sync_method("getblockcount", move |_params: Params| {
Ok(Value::Number(h.into()))
});
}

pub fn url(&self) -> &str {
&self.url
}
Expand Down
14 changes: 9 additions & 5 deletions teos/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,15 @@ impl chain::Listen for Watcher {
for (uuid, breach) in valid_breaches {
log::info!("Notifying Responder and deleting appointment (uuid: {uuid})");

if let ConfirmationStatus::Rejected(_) = self.responder.handle_breach(
uuid,
breach,
self.appointments.lock().unwrap()[&uuid].user_id,
) {
if self
.responder
.handle_breach(
uuid,
breach,
self.appointments.lock().unwrap()[&uuid].user_id,
)
.rejected()
{
appointments_to_delete.insert(uuid);
} else {
delivered_appointments.insert(uuid);
Expand Down