Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
- `EsploraSyncConfig` and `ElectrumSyncConfig` now support `force_wallet_full_scan`. When set,
the on-chain wallet keeps using BDK `full_scan` instead of incremental sync until a full scan
succeeds, allowing restored wallets to rediscover funds sent to previously-unknown addresses.
- A new `Node::list_payments_paginated` method allows retrieving payments page-by-page, ordered
from most recently created to least recently created, instead of all at once.

## Bug Fixes and Improvements
- Building a fresh node against a Bitcoin Core RPC or REST chain source that fails to return the
Expand Down
7 changes: 7 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ interface Node {
void remove_payment([ByRef]PaymentId payment_id);
BalanceDetails list_balances();
sequence<PaymentDetails> list_payments();
[Throws=NodeError]
PaymentDetailsPage list_payments_paginated(PageToken? page_token);
sequence<PeerDetails> list_peers();
sequence<ChannelDetails> list_channels();
NetworkGraph network_graph();
Expand Down Expand Up @@ -261,6 +263,8 @@ enum PaymentFailureReason {

typedef dictionary PaymentDetails;

typedef dictionary PaymentDetailsPage;

[Remote]
dictionary RouteParametersConfig {
u64? max_total_routing_fee_msat;
Expand Down Expand Up @@ -376,6 +380,9 @@ typedef string Address;
[Custom]
typedef string OfferId;

[Custom]
typedef string PageToken;

[Custom]
typedef string PaymentId;

Expand Down
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub(crate) const BDK_CLIENT_STOP_GAP: usize = 20;
// The number of concurrent requests made against the API provider.
pub(crate) const BDK_CLIENT_CONCURRENCY: usize = 4;

// The maximum number of object reads we keep in flight at once when reading a page from the
// data store.
pub(crate) const DATA_STORE_READ_CONCURRENCY_LIMIT: usize = 10;

// The timeout after which we abandon retrying failed payments.
pub(crate) const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10);

Expand Down
141 changes: 139 additions & 2 deletions src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::ops::Deref;
use std::sync::{Arc, Mutex};

use lightning::util::persist::KVStore;
use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore};
use lightning::util::ser::{Readable, Writeable};

use crate::config::DATA_STORE_READ_CONCURRENCY_LIMIT;
use crate::logger::{log_error, LdkLogger};
use crate::types::DynStore;
use crate::Error;
Expand Down Expand Up @@ -179,6 +180,104 @@ where
self.objects.lock().expect("lock").values().filter(f).cloned().collect::<Vec<SO>>()
}

/// Returns a page of objects read from the underlying store, ordered from most recently
/// created to least recently created, together with a token that can be passed to a
/// subsequent call to retrieve the next page.
pub(crate) async fn list_page(
&self, page_token: Option<PageToken>,
) -> Result<(Vec<SO>, Option<PageToken>), Error> {
let response = PaginatedKVStore::list_paginated(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we're still leaning on all DataStore entries being kept in memory everywhere else (which we should change soon as mentioned above). So right now it's a bit odd to have the paginated version being the only one calling through to KVStore, and not even using any cache (so it's likely pretty slow).

So, should this just use the in-memory entries? If not, we could consider already making the jump to not keep all entries in memory, but then we'd need some (presumably LRU?) caching logic for DataStore first, before we add pagination support/switch it to use pagination?

&*self.kv_store,
&self.primary_namespace,
&self.secondary_namespace,
page_token,
)
.await
.map_err(|e| {
log_error!(
self.logger,
"Listing object data under {}/{} failed due to: {}",
&self.primary_namespace,
&self.secondary_namespace,
e
);
Error::PersistenceFailed
})?;

// Read the page's objects through a sliding window: keep up to
// `DATA_STORE_READ_CONCURRENCY_LIMIT` reads in flight and await them in key order to
// preserve the page's newest-first ordering. The page size is implementation-defined,
// so we can't rely on pages being small.
let mut spawn_iter = response.keys.iter();
let mut read_handles = VecDeque::with_capacity(DATA_STORE_READ_CONCURRENCY_LIMIT);
for key in spawn_iter.by_ref().take(DATA_STORE_READ_CONCURRENCY_LIMIT) {
read_handles.push_back(tokio::spawn(KVStore::read(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by question: are there plans for something like list_paginated_values, so SQL backends can fetch a page of serialized values in a single query instead of doing list_paginated plus one read per key?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a follow-up optimization that could eventually make sense (though, maybe even worth benchmarking if necessary for the concrete use cases). Mind opening an issue for it so we don't forget to revisit?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&*self.kv_store,
&self.primary_namespace,
&self.secondary_namespace,
key,
)));
}

let mut objects = Vec::with_capacity(response.keys.len());
for key in response.keys.iter() {
let handle = read_handles.pop_front().expect("read spawned for each key");

// Refill the window for every read we await, if keys remain.
if let Some(next_key) = spawn_iter.next() {
read_handles.push_back(tokio::spawn(KVStore::read(
&*self.kv_store,
&self.primary_namespace,
&self.secondary_namespace,
next_key,
)));
}

let read_res = handle.await.map_err(|e| {
log_error!(
self.logger,
"Reading object data for key {}/{}/{} failed due to: {}",
&self.primary_namespace,
&self.secondary_namespace,
key,
e
);
Error::PersistenceFailed
})?;
let data = match read_res {
Ok(data) => data,
// The object was removed after we listed the page's keys. Skip it rather than
// failing the page.
Err(e) if e.kind() == lightning::io::ErrorKind::NotFound => continue,
Err(e) => {
log_error!(
self.logger,
"Reading object data for key {}/{}/{} failed due to: {}",
&self.primary_namespace,
&self.secondary_namespace,
key,
e
);
return Err(Error::PersistenceFailed);
},
};
let object = SO::read(&mut &data[..]).map_err(|e| {
log_error!(
self.logger,
"Failed to deserialize object data for key {}/{}/{}: {}",
&self.primary_namespace,
&self.secondary_namespace,
key,
e
);
Error::PersistenceFailed
})?;
objects.push(object);
}

Ok((objects, response.next_page_token))
}

async fn persist(&self, object: &SO) -> Result<(), Error> {
let (store_key, data) = Self::encode_object(object);
self.persist_encoded(store_key, data).await
Expand Down Expand Up @@ -338,6 +437,44 @@ mod tests {
)
}

#[tokio::test]
async fn list_page_paginates_in_reverse_creation_order() {
let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new()));
let logger = Arc::new(TestLogger::new());
let data_store: DataStore<TestObject, Arc<TestLogger>> = DataStore::new(
Vec::new(),
"datastore_test_primary".to_string(),
"datastore_test_secondary".to_string(),
Arc::clone(&store),
logger,
);

// Insert more objects than fit in a single page to exercise the pagination loop.
let num_objects = 120u32;
for i in 0..num_objects {
let id = TestObjectId { id: i.to_be_bytes() };
data_store.insert(TestObject { id, data: [7u8; 3] }).await.unwrap();
}

let mut listed = Vec::with_capacity(num_objects as usize);
let mut page_token = None;
loop {
let (page, next_page_token) = data_store.list_page(page_token).await.unwrap();
assert!(!page.is_empty());
listed.extend(page);
page_token = next_page_token;
if page_token.is_none() {
break;
}
}

let expected: Vec<TestObject> = (0..num_objects)
.rev()
.map(|i| TestObject { id: TestObjectId { id: i.to_be_bytes() }, data: [7u8; 3] })
.collect();
assert_eq!(listed, expected);
}

#[tokio::test]
async fn data_is_persisted() {
let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new()));
Expand Down
12 changes: 11 additions & 1 deletion src/ffi/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount};
use crate::error::Error;
pub use crate::liquidity::LSPS1OrderStatus;
pub use crate::logger::{LogLevel, LogRecord, LogWriter};
use crate::{hex_utils, SocketAddress, UserChannelId};
use crate::{hex_utils, PageToken, SocketAddress, UserChannelId};

uniffi::custom_type!(PublicKey, String, {
remote,
Expand Down Expand Up @@ -893,6 +893,16 @@ uniffi::custom_type!(OfferId, String, {
},
});

uniffi::custom_type!(PageToken, String, {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the remote type rather than adding a custom type (as we try to get rid of the latter over time)?

remote,
try_lift: |val| {
Ok(PageToken::new(val))
},
lower: |obj| {
obj.to_string()
},
});

uniffi::custom_type!(PaymentId, String, {
remote,
try_lift: |val| {
Expand Down
25 changes: 25 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ use lightning::ln::peer_handler::CustomMessageHandler;
use lightning::routing::gossip::NodeAlias;
use lightning::sign::EntropySource;
use lightning::util::persist::KVStore;
pub use lightning::util::persist::PageToken;
use lightning::util::wallet_utils::{Input, Wallet as LdkWallet};
use lightning_background_processor::process_events_async;
pub use lightning_invoice;
Expand Down Expand Up @@ -254,6 +255,16 @@ pub struct Node {
_leak_checker: LeakChecker,
}

/// A page of payments returned from a paginated listing.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
pub struct PaymentDetailsPage {
/// Payments in this page, ordered from most recently created to least recently created.
pub payments: Vec<PaymentDetails>,
/// Token to pass to the next call to continue listing, if another page exists.
pub next_page_token: Option<PageToken>,
}

impl Node {
/// Starts the necessary background tasks, such as handling events coming from user input,
/// LDK/BDK, and the peer-to-peer network.
Expand Down Expand Up @@ -2135,6 +2146,20 @@ impl Node {
self.payment_store.list_filter(|_| true)
}

/// Retrieves a page of payments from the underlying paginated store, ordered from most
/// recently created to least recently created.
///
/// Pass `None` to start listing from the most recently created payment. If the returned
/// [`PaymentDetailsPage::next_page_token`] is `Some`, pass it to a subsequent call to
/// retrieve the next page.
pub fn list_payments_paginated(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, do we really want an additional method? At some point (hopefully for v0.9) we'll stop keeping all entries in memory and at that point at the latest the current shape of list_payments isn't feasible anymore. So should this just replace list_payments? Or not yet?

&self, page_token: Option<PageToken>,
) -> Result<PaymentDetailsPage, Error> {
let (payments, next_page_token) =
self.runtime.block_on(self.payment_store.list_page(page_token))?;
Ok(PaymentDetailsPage { payments, next_page_token })
}

/// Retrieves a list of known peers.
pub fn list_peers(&self) -> Vec<PeerDetails> {
let mut peers = Vec::new();
Expand Down
Loading