Skip to content

Commit

Permalink
refactor(MarketCoinOps): make wait_for_htlc_tx_spend async (#2265)
Browse files Browse the repository at this point in the history
  • Loading branch information
shamardy authored Nov 8, 2024
1 parent 99cb87a commit f401497
Show file tree
Hide file tree
Showing 21 changed files with 215 additions and 233 deletions.
128 changes: 61 additions & 67 deletions mm2src/coins/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2569,18 +2569,18 @@ impl MarketCoinOps for EthCoin {
Box::new(fut.boxed().compat())
}

fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut {
let unverified: UnverifiedTransactionWrapper = try_tx_fus!(rlp::decode(args.tx_bytes));
let tx = try_tx_fus!(SignedEthTx::new(unverified));
async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult {
let unverified: UnverifiedTransactionWrapper = try_tx_s!(rlp::decode(args.tx_bytes));
let tx = try_tx_s!(SignedEthTx::new(unverified));

let swap_contract_address = match args.swap_contract_address {
Some(addr) => try_tx_fus!(addr.try_to_address()),
Some(addr) => try_tx_s!(addr.try_to_address()),
None => match tx.unsigned().action() {
Call(address) => *address,
Create => {
return Box::new(futures01::future::err(TransactionErr::Plain(ERRL!(
return Err(TransactionErr::Plain(ERRL!(
"Invalid payment action: the payment action cannot be create"
))))
)))
},
},
};
Expand All @@ -2589,85 +2589,79 @@ impl MarketCoinOps for EthCoin {
EthCoinType::Eth => get_function_name("ethPayment", args.watcher_reward),
EthCoinType::Erc20 { .. } => get_function_name("erc20Payment", args.watcher_reward),
EthCoinType::Nft { .. } => {
return Box::new(futures01::future::err(TransactionErr::ProtocolNotSupported(ERRL!(
return Err(TransactionErr::ProtocolNotSupported(ERRL!(
"Nft Protocol is not supported yet!"
))))
)))
},
};

let payment_func = try_tx_fus!(SWAP_CONTRACT.function(&func_name));
let decoded = try_tx_fus!(decode_contract_call(payment_func, tx.unsigned().data()));
let payment_func = try_tx_s!(SWAP_CONTRACT.function(&func_name));
let decoded = try_tx_s!(decode_contract_call(payment_func, tx.unsigned().data()));
let id = match decoded.first() {
Some(Token::FixedBytes(bytes)) => bytes.clone(),
invalid_token => {
return Box::new(futures01::future::err(TransactionErr::Plain(ERRL!(
return Err(TransactionErr::Plain(ERRL!(
"Expected Token::FixedBytes, got {:?}",
invalid_token
))))
)))
},
};
let selfi = self.clone();
let from_block = args.from_block;
let wait_until = args.wait_until;
let check_every = args.check_every;
let fut = async move {
loop {
if now_sec() > wait_until {
return TX_PLAIN_ERR!(
"Waited too long until {} for transaction {:?} to be spent ",
wait_until,
tx,
);
}

let current_block = match selfi.current_block().compat().await {
Ok(b) => b,
Err(e) => {
error!("Error getting block number: {}", e);
Timer::sleep(5.).await;
continue;
},
};
loop {
if now_sec() > args.wait_until {
return TX_PLAIN_ERR!(
"Waited too long until {} for transaction {:?} to be spent ",
args.wait_until,
tx,
);
}

let events = match selfi
.spend_events(swap_contract_address, from_block, current_block)
.compat()
.await
{
Ok(ev) => ev,
Err(e) => {
error!("Error getting spend events: {}", e);
Timer::sleep(5.).await;
continue;
},
};
let current_block = match self.current_block().compat().await {
Ok(b) => b,
Err(e) => {
error!("Error getting block number: {}", e);
Timer::sleep(5.).await;
continue;
},
};

let found = events.iter().find(|event| &event.data.0[..32] == id.as_slice());
let events = match self
.spend_events(swap_contract_address, args.from_block, current_block)
.compat()
.await
{
Ok(ev) => ev,
Err(e) => {
error!("Error getting spend events: {}", e);
Timer::sleep(5.).await;
continue;
},
};

if let Some(event) = found {
if let Some(tx_hash) = event.transaction_hash {
let transaction = match selfi.transaction(TransactionId::Hash(tx_hash)).await {
Ok(Some(t)) => t,
Ok(None) => {
info!("Tx {} not found yet", tx_hash);
Timer::sleep(check_every).await;
continue;
},
Err(e) => {
error!("Get tx {} error: {}", tx_hash, e);
Timer::sleep(check_every).await;
continue;
},
};
let found = events.iter().find(|event| &event.data.0[..32] == id.as_slice());

return Ok(TransactionEnum::from(try_tx_s!(signed_tx_from_web3_tx(transaction))));
}
}
if let Some(event) = found {
if let Some(tx_hash) = event.transaction_hash {
let transaction = match self.transaction(TransactionId::Hash(tx_hash)).await {
Ok(Some(t)) => t,
Ok(None) => {
info!("Tx {} not found yet", tx_hash);
Timer::sleep(args.check_every).await;
continue;
},
Err(e) => {
error!("Get tx {} error: {}", tx_hash, e);
Timer::sleep(args.check_every).await;
continue;
},
};

Timer::sleep(5.).await;
return Ok(TransactionEnum::from(try_tx_s!(signed_tx_from_web3_tx(transaction))));
}
}
};
Box::new(fut.boxed().compat())

Timer::sleep(5.).await;
}
}

fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result<TransactionEnum, MmError<TxMarshalingErr>> {
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/eth/eth_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ fn test_wait_for_payment_spend_timeout() {
184, 42, 106,
];

assert!(block_on_f01(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs {
assert!(block_on(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs {
tx_bytes: &tx_bytes,
secret_hash: &[],
wait_until,
Expand Down
83 changes: 39 additions & 44 deletions mm2src/coins/lightning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,58 +1164,53 @@ impl MarketCoinOps for LightningCoin {
Box::new(fut.boxed().compat())
}

fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut {
let payment_hash = try_tx_fus!(payment_hash_from_slice(args.tx_bytes));
async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult {
let payment_hash = try_tx_s!(payment_hash_from_slice(args.tx_bytes));
let payment_hex = hex::encode(payment_hash.0);

let coin = self.clone();
let wait_until = args.wait_until;
let fut = async move {
loop {
if now_sec() > wait_until {
return Err(TransactionErr::Plain(ERRL!(
"Waited too long until {} for payment {} to be spent",
wait_until,
payment_hex
)));
}
loop {
if now_sec() > args.wait_until {
return Err(TransactionErr::Plain(ERRL!(
"Waited too long until {} for payment {} to be spent",
args.wait_until,
payment_hex
)));
}

match coin.db.get_payment_from_db(payment_hash).await {
Ok(Some(payment)) => match payment.status {
HTLCStatus::Pending => (),
HTLCStatus::Claimable => {
return Err(TransactionErr::Plain(ERRL!(
"Payment {} has an invalid status of {} in the db",
payment_hex,
payment.status
)))
},
HTLCStatus::Succeeded => return Ok(TransactionEnum::LightningPayment(payment_hash)),
HTLCStatus::Failed => {
return Err(TransactionErr::Plain(ERRL!(
"Lightning swap payment {} failed",
payment_hex
)))
},
},
Ok(None) => return Err(TransactionErr::Plain(ERRL!("Payment {} not found in DB", payment_hex))),
Err(e) => {
match self.db.get_payment_from_db(payment_hash).await {
Ok(Some(payment)) => match payment.status {
HTLCStatus::Pending => (),
HTLCStatus::Claimable => {
return Err(TransactionErr::Plain(ERRL!(
"Error getting payment {} from db: {}",
"Payment {} has an invalid status of {} in the db",
payment_hex,
e
payment.status
)))
},
}

// note: When sleeping for only 1 second the test_send_payment_and_swaps unit test took 20 seconds to complete instead of 37 seconds when sleeping for 10 seconds
// Todo: In next sprints, should add a mutex for lightning swap payments to avoid overloading the shared db connection with requests when the sleep time is reduced and multiple swaps are ran together.
// Todo: The aim is to make lightning swap payments as fast as possible, more sleep time can be allowed for maker payment since it waits for the secret to be revealed on another chain first.
// Todo: Running swap payments statuses should be loaded from db on restarts in this case.
Timer::sleep(10.).await;
HTLCStatus::Succeeded => return Ok(TransactionEnum::LightningPayment(payment_hash)),
HTLCStatus::Failed => {
return Err(TransactionErr::Plain(ERRL!(
"Lightning swap payment {} failed",
payment_hex
)))
},
},
Ok(None) => return Err(TransactionErr::Plain(ERRL!("Payment {} not found in DB", payment_hex))),
Err(e) => {
return Err(TransactionErr::Plain(ERRL!(
"Error getting payment {} from db: {}",
payment_hex,
e
)))
},
}
};
Box::new(fut.boxed().compat())

// note: When sleeping for only 1 second the test_send_payment_and_swaps unit test took 20 seconds to complete instead of 37 seconds when sleeping for 10 seconds
// Todo: In next sprints, should add a mutex for lightning swap payments to avoid overloading the shared db connection with requests when the sleep time is reduced and multiple swaps are ran together.
// Todo: The aim is to make lightning swap payments as fast as possible, more sleep time can be allowed for maker payment since it waits for the secret to be revealed on another chain first.
// Todo: Running swap payments statuses should be loaded from db on restarts in this case.
Timer::sleep(10.).await;
}
}

fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result<TransactionEnum, MmError<TxMarshalingErr>> {
Expand Down
1 change: 0 additions & 1 deletion mm2src/coins/lightning/ln_platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,6 @@ impl Platform {
check_every: TAKER_PAYMENT_SPEND_SEARCH_INTERVAL,
watcher_reward: false,
})
.compat()
.await
.map_to_mm(|e| SaveChannelClosingError::WaitForFundingTxSpendError(e.get_plain_text_format()))?;

Expand Down
8 changes: 7 additions & 1 deletion mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2028,7 +2028,13 @@ pub trait MarketCoinOps {

fn wait_for_confirmations(&self, input: ConfirmPaymentInput) -> Box<dyn Future<Item = (), Error = String> + Send>;

fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut;
/// Waits for spending/unlocking of funds locked in a HTLC construction specific to the coin's
/// chain. Implementation should monitor locked funds (UTXO/contract/etc.) until funds are
/// spent/unlocked or timeout is reached.
///
/// Returns spending tx/event from mempool/pending state to allow prompt extraction of preimage
/// secret.
async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult;

fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result<TransactionEnum, MmError<TxMarshalingErr>>;

Expand Down
22 changes: 5 additions & 17 deletions mm2src/coins/qrc20.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1254,23 +1254,11 @@ impl MarketCoinOps for Qrc20Coin {
Box::new(fut.boxed().compat())
}

fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut {
let tx: UtxoTx = try_tx_fus!(deserialize(args.tx_bytes).map_err(|e| ERRL!("{:?}", e)));

let selfi = self.clone();
let WaitForHTLCTxSpendArgs {
check_every,
from_block,
wait_until,
..
} = args;
let fut = async move {
selfi
.wait_for_tx_spend_impl(tx, wait_until, from_block, check_every)
.map_err(TransactionErr::Plain)
.await
};
Box::new(fut.boxed().compat())
async fn wait_for_htlc_tx_spend(&self, args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult {
let tx: UtxoTx = try_tx_s!(deserialize(args.tx_bytes).map_err(|e| ERRL!("{:?}", e)));
self.wait_for_tx_spend_impl(tx, args.wait_until, args.from_block, args.check_every)
.map_err(TransactionErr::Plain)
.await
}

fn tx_enum_from_bytes(&self, bytes: &[u8]) -> Result<TransactionEnum, MmError<TxMarshalingErr>> {
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/qrc20/qrc20_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ fn test_wait_for_tx_spend_malicious() {
let payment_tx = hex::decode("01000000016601daa208531d20532c460d0c86b74a275f4a126bbffcf4eafdf33835af2859010000006a47304402205825657548bc1b5acf3f4bb2f89635a02b04f3228cd08126e63c5834888e7ac402207ca05fa0a629a31908a97a508e15076e925f8e621b155312b7526a6666b06a76012103693bff1b39e8b5a306810023c29b95397eb395530b106b1820ea235fd81d9ce9ffffffff020000000000000000e35403a0860101284cc49b415b2a8620ad3b72361a5aeba5dffd333fb64750089d935a1ec974d6a91ef4f24ff6ba0000000000000000000000000000000000000000000000000000000001312d00000000000000000000000000d362e096e873eb7907e205fadc6175c6fec7bc44000000000000000000000000783cf0be521101942da509846ea476e683aad8324b6b2e5444c2639cc0fb7bcea5afba3f3cdce239000000000000000000000000000000000000000000000000000000000000000000000000000000005f855c7614ba8b71f3544b93e2f681f996da519a98ace0107ac2203de400000000001976a9149e032d4b0090a11dc40fe6c47601499a35d55fbb88ac415d855f").unwrap();
let wait_until = now_sec() + 1;
let from_block = 696245;
let found = block_on_f01(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs {
let found = block_on(coin.wait_for_htlc_tx_spend(WaitForHTLCTxSpendArgs {
tx_bytes: &payment_tx,
secret_hash: &[],
wait_until,
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/siacoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ impl MarketCoinOps for SiaCoin {
unimplemented!()
}

fn wait_for_htlc_tx_spend(&self, _args: WaitForHTLCTxSpendArgs<'_>) -> TransactionFut { unimplemented!() }
async fn wait_for_htlc_tx_spend(&self, _args: WaitForHTLCTxSpendArgs<'_>) -> TransactionResult { unimplemented!() }

fn tx_enum_from_bytes(&self, _bytes: &[u8]) -> Result<TransactionEnum, MmError<TxMarshalingErr>> {
MmError::err(TxMarshalingErr::NotSupported(
Expand Down
Loading

0 comments on commit f401497

Please sign in to comment.