Skip to content

Commit

Permalink
Merge pull request #3436 from tnull/2024-12-add-lightning-liquidity-c…
Browse files Browse the repository at this point in the history
…rate

Add `lightning-liquidity` crate to the workspace
  • Loading branch information
TheBlueMatt authored Dec 17, 2024
2 parents f53a09d + f68c6c5 commit 6ad40f9
Show file tree
Hide file tree
Showing 39 changed files with 8,056 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"lightning-transaction-sync",
"lightning-macros",
"lightning-dns-resolver",
"lightning-liquidity",
"possiblyrandom",
]

Expand Down
6 changes: 5 additions & 1 deletion ci/ci-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ PIN_RELEASE_DEPS # pin the release dependencies in our main workspace
# Starting with version 0.5.9 (there is no .6-.8), the `home` crate has an MSRV of rustc 1.70.0.
[ "$RUSTC_MINOR_VERSION" -lt 70 ] && cargo update -p home --precise "0.5.5" --verbose

# proptest 1.3.0 requires rustc 1.64.0
[ "$RUSTC_MINOR_VERSION" -lt 64 ] && cargo update -p proptest --precise "1.2.0" --verbose

export RUST_BACKTRACE=1

echo -e "\n\nChecking the full workspace."
Expand All @@ -58,6 +61,7 @@ WORKSPACE_MEMBERS=(
lightning-transaction-sync
lightning-macros
lightning-dns-resolver
lightning-liquidity
possiblyrandom
)

Expand Down Expand Up @@ -110,7 +114,7 @@ echo -e "\n\nTest backtrace-debug builds"
cargo test -p lightning --verbose --color always --features backtrace

echo -e "\n\nTesting no_std builds"
for DIR in lightning-invoice lightning-rapid-gossip-sync; do
for DIR in lightning-invoice lightning-rapid-gossip-sync lightning-liquidity; do
cargo test -p $DIR --verbose --color always --no-default-features
done

Expand Down
50 changes: 50 additions & 0 deletions lightning-liquidity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[package]
name = "lightning-liquidity"
version = "0.1.0-alpha.6"
authors = ["John Cantrell <[email protected]>", "Elias Rohrer <[email protected]>"]
homepage = "https://lightningdevkit.org/"
license = "MIT OR Apache-2.0"
edition = "2021"
description = "Types and primitives to integrate a spec-compliant LSP with an LDK-based node."
repository = "https://github.com/lightningdevkit/lightning-liquidity/"
readme = "README.md"
keywords = ["bitcoin", "lightning", "ldk", "bdk"]
categories = ["cryptography::cryptocurrencies"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["std"]
std = ["lightning/std"]
backtrace = ["dep:backtrace"]

[dependencies]
lightning = { version = "0.0.124", path = "../lightning", default-features = false }
lightning-types = { version = "0.1", path = "../lightning-types", default-features = false }
lightning-invoice = { version = "0.32.0", path = "../lightning-invoice", default-features = false, features = ["serde"] }

bitcoin = { version = "0.32.2", default-features = false, features = ["serde"] }

chrono = { version = "0.4", default-features = false, features = ["serde", "alloc"] }
serde = { version = "1.0", default-features = false, features = ["derive", "alloc"] }
serde_json = "1.0"
backtrace = { version = "0.3", optional = true }

[dev-dependencies]
lightning = { version = "0.0.124", path = "../lightning", default-features = false, features = ["_test_utils"] }
lightning-invoice = { version = "0.32.0", path = "../lightning-invoice", default-features = false, features = ["serde", "std"] }
lightning-persister = { version = "0.0.124", path = "../lightning-persister", default-features = false }
lightning-background-processor = { version = "0.0.124", path = "../lightning-background-processor", default-features = false, features = ["std"] }

proptest = "1.0.0"
tokio = { version = "1.35", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }

[lints.rust.unexpected_cfgs]
level = "forbid"
# When adding a new cfg attribute, ensure that it is added to this list.
check-cfg = [
"cfg(lsps1_service)",
"cfg(c_bindings)",
"cfg(backtrace)",
"cfg(ldk_bench)",
]
20 changes: 20 additions & 0 deletions lightning-liquidity/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# lightning-liquidity

The goal of this crate is to provide types and primitives to integrate a spec-compliant LSP with an LDK-based node. To this end, this crate provides client-side as well as service-side logic to implement the [LSP specifications].

Currently the following specifications are supported:
- [LSPS0] defines the transport protocol with the LSP over which the other protocols communicate.
- [LSPS1] allows to order Lightning channels from an LSP. This is useful when the client needs
inbound Lightning liquidity for which they are willing and able to pay in bitcoin.
- [LSPS2] allows to generate a special invoice for which, when paid, an LSP will open a "just-in-time".
This is useful for the initial on-boarding of clients as the channel opening fees are deducted
from the incoming payment, i.e., no funds are required client-side to initiate this flow.

To get started, you'll want to setup a `LiquidityManager` and configure it to be the `CustomMessageHandler` of your LDK node. You can then call `LiquidityManager::lsps1_client_handler` / `LiquidityManager::lsps2_client_handler`, or `LiquidityManager::lsps2_service_handler`, to access the respective client-side or service-side handlers.

`LiquidityManager` uses an eventing system to notify the user about important updates to the protocol flow. To this end, you will need to handle events emitted via one of the event handling methods provided by `LiquidityManager`, e.g., `LiquidityManager::next_event`.

[LSP specifications]: https://github.com/BitcoinAndLightningLayerSpecs/lsp
[LSPS0]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS0
[LSPS1]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS1
[LSPS2]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS2
245 changes: 245 additions & 0 deletions lightning-liquidity/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// This file is Copyright its original authors, visible in version control
// history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.

//! Events are surfaced by the library to indicate some action must be taken
//! by the end-user.
//!
//! Because we don't have a built-in runtime, it's up to the end-user to poll
//! [`LiquidityManager::get_and_clear_pending_events`] to receive events.
//!
//! [`LiquidityManager::get_and_clear_pending_events`]: crate::LiquidityManager::get_and_clear_pending_events
use crate::lsps0;
use crate::lsps1;
use crate::lsps2;
use crate::prelude::{Vec, VecDeque};
use crate::sync::{Arc, Mutex};

use core::future::Future;
use core::task::{Poll, Waker};

/// The maximum queue size we allow before starting to drop events.
pub const MAX_EVENT_QUEUE_SIZE: usize = 1000;

pub(crate) struct EventQueue {
queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
#[cfg(feature = "std")]
condvar: crate::sync::Condvar,
}

impl EventQueue {
pub fn new() -> Self {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let waker = Arc::new(Mutex::new(None));
#[cfg(feature = "std")]
{
let condvar = crate::sync::Condvar::new();
Self { queue, waker, condvar }
}
#[cfg(not(feature = "std"))]
Self { queue, waker }
}

pub fn enqueue(&self, event: Event) {
{
let mut queue = self.queue.lock().unwrap();
if queue.len() < MAX_EVENT_QUEUE_SIZE {
queue.push_back(event);
} else {
return;
}
}

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
#[cfg(feature = "std")]
self.condvar.notify_one();
}

pub fn next_event(&self) -> Option<Event> {
self.queue.lock().unwrap().pop_front()
}

pub async fn next_event_async(&self) -> Event {
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
}

#[cfg(feature = "std")]
pub fn wait_next_event(&self) -> Event {
let mut queue = self
.condvar
.wait_while(self.queue.lock().unwrap(), |queue: &mut VecDeque<Event>| queue.is_empty())
.unwrap();

let event = queue.pop_front().expect("non-empty queue");
let should_notify = !queue.is_empty();

drop(queue);

if should_notify {
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}

self.condvar.notify_one();
}

event
}

pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
self.queue.lock().unwrap().split_off(0).into()
}
}

/// An event which you should probably take some action in response to.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
/// An LSPS0 client event.
LSPS0Client(lsps0::event::LSPS0ClientEvent),
/// An LSPS1 (Channel Request) client event.
LSPS1Client(lsps1::event::LSPS1ClientEvent),
/// An LSPS1 (Channel Request) server event.
#[cfg(lsps1_service)]
LSPS1Service(lsps1::event::LSPS1ServiceEvent),
/// An LSPS2 (JIT Channel) client event.
LSPS2Client(lsps2::event::LSPS2ClientEvent),
/// An LSPS2 (JIT Channel) server event.
LSPS2Service(lsps2::event::LSPS2ServiceEvent),
}

struct EventFuture {
event_queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl Future for EventFuture {
type Output = Event;

fn poll(
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
if let Some(event) = self.event_queue.lock().unwrap().pop_front() {
Poll::Ready(event)
} else {
*self.waker.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending
}
}
}

#[cfg(test)]
mod tests {
#[tokio::test]
#[cfg(feature = "std")]
async fn event_queue_works() {
use super::*;
use crate::lsps0::event::LSPS0ClientEvent;
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
use core::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::Duration;

let event_queue = Arc::new(EventQueue::new());
assert_eq!(event_queue.next_event(), None);

let secp_ctx = Secp256k1::new();
let counterparty_node_id =
PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap());
let expected_event = Event::LSPS0Client(LSPS0ClientEvent::ListProtocolsResponse {
counterparty_node_id,
protocols: Vec::new(),
});

for _ in 0..3 {
event_queue.enqueue(expected_event.clone());
}

assert_eq!(event_queue.wait_next_event(), expected_event);
assert_eq!(event_queue.next_event_async().await, expected_event);
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
assert_eq!(event_queue.next_event(), None);

// Check `next_event_async` won't return if the queue is empty and always rather timeout.
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(10)) => {
// Timeout
}
_ = event_queue.next_event_async() => {
panic!();
}
}
assert_eq!(event_queue.next_event(), None);

// Check we get the expected number of events when polling/enqueuing concurrently.
let enqueued_events = AtomicU16::new(0);
let received_events = AtomicU16::new(0);
let mut delayed_enqueue = false;

for _ in 0..25 {
event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
}

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
delayed_enqueue = true;
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
received_events.fetch_add(1, Ordering::SeqCst);

event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
received_events.fetch_add(1, Ordering::SeqCst);
}
}

if delayed_enqueue
&& received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
{
break;
}
}
assert_eq!(event_queue.next_event(), None);

// Check we operate correctly, even when mixing and matching blocking and async API calls.
let (tx, mut rx) = tokio::sync::watch::channel(());
let thread_queue = Arc::clone(&event_queue);
let thread_event = expected_event.clone();
std::thread::spawn(move || {
let e = thread_queue.wait_next_event();
assert_eq!(e, thread_event);
tx.send(()).unwrap();
});

let thread_queue = Arc::clone(&event_queue);
let thread_event = expected_event.clone();
std::thread::spawn(move || {
// Sleep a bit before we enqueue the events everybody is waiting for.
std::thread::sleep(Duration::from_millis(20));
thread_queue.enqueue(thread_event.clone());
thread_queue.enqueue(thread_event.clone());
});

let e = event_queue.next_event_async().await;
assert_eq!(e, expected_event.clone());

rx.changed().await.unwrap();
assert_eq!(event_queue.next_event(), None);
}
}
Loading

0 comments on commit 6ad40f9

Please sign in to comment.