Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions ldk-server/src/api/list_forwarded_payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) async fn handle_list_forwarded_payments_request(
let page_token = request.page_token.map(|p| (p.token, p.index));
let list_response = context
.paginated_kv_store
.list(
.list_values(
FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,
FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE,
page_token,
Expand All @@ -41,21 +41,8 @@ pub(crate) async fn handle_list_forwarded_payments_request(
})?;

let mut forwarded_payments: Vec<ForwardedPayment> =
Vec::with_capacity(list_response.keys.len());
for key in list_response.keys {
let forwarded_payment_bytes = context
.paginated_kv_store
.read(
FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,
FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
)
.map_err(|e| {
LdkServerError::new(
InternalServerError,
format!("Failed to read forwarded payment data: {}", e),
)
})?;
Vec::with_capacity(list_response.items.len());
for (_, forwarded_payment_bytes) in list_response.items {
let forwarded_payment = ForwardedPayment::decode(Bytes::from(forwarded_payment_bytes))
.map_err(|e| {
LdkServerError::new(
Expand Down
19 changes: 3 additions & 16 deletions ldk-server/src/api/list_payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(crate) async fn handle_list_payments_request(
let page_token = request.page_token.map(|p| (p.token, p.index));
let list_response = context
.paginated_kv_store
.list(
.list_values(
PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE,
page_token,
Expand All @@ -36,21 +36,8 @@ pub(crate) async fn handle_list_payments_request(
LdkServerError::new(InternalServerError, format!("Failed to list payments: {}", e))
})?;

let mut payments: Vec<Payment> = Vec::with_capacity(list_response.keys.len());
for key in list_response.keys {
let payment_bytes = context
.paginated_kv_store
.read(
PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
)
.map_err(|e| {
LdkServerError::new(
InternalServerError,
format!("Failed to read payment data: {}", e),
)
})?;
let mut payments: Vec<Payment> = Vec::with_capacity(list_response.items.len());
for (_, payment_bytes) in list_response.items {
let payment = Payment::decode(Bytes::from(payment_bytes)).map_err(|e| {
LdkServerError::new(InternalServerError, format!("Failed to decode payment: {}", e))
})?;
Expand Down
24 changes: 24 additions & 0 deletions ldk-server/src/io/persist/paginated_kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub trait PaginatedKVStore: Send + Sync {
/// `primary_namespace` and `secondary_namespace`.
///
/// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound
#[cfg_attr(not(test), allow(dead_code))]
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> Result<Vec<u8>, io::Error>;
Expand Down Expand Up @@ -74,20 +75,43 @@ pub trait PaginatedKVStore: Send + Sync {
/// there are no more keys to return.
///
/// [`ListResponse`]: struct.ListResponse.html
#[cfg_attr(not(test), allow(dead_code))]
fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
next_page_token: Option<(String, i64)>,
) -> Result<ListResponse, io::Error>;

/// Returns a paginated list of keys and values that are stored under the given
/// `secondary_namespace` in `primary_namespace`, ordered in descending order of `time`.
///
/// This follows the same pagination and ordering semantics as [`PaginatedKVStore::list`].
fn list_values(
&self, primary_namespace: &str, secondary_namespace: &str,
next_page_token: Option<(String, i64)>,
) -> Result<ListValuesResponse, io::Error>;
}

/// Represents the response from a paginated `list` operation.
///
/// Contains the list of keys and an optional `next_page_token` that can be used to retrieve the
/// next set of keys.
#[cfg_attr(not(test), allow(dead_code))]
pub struct ListResponse {
/// A vector of keys, ordered in descending order of `time`.
pub keys: Vec<String>,

/// A token that can be used to retrieve the next set of keys.
pub next_page_token: Option<(String, i64)>,
}

/// Represents the response from a paginated `list_values` operation.
///
/// Contains the list of keys and values and an optional `next_page_token` that can be used to
/// retrieve the next set of items.
pub struct ListValuesResponse {
/// A vector of key-value pairs, ordered in descending order of `time`.
pub items: Vec<(String, Vec<u8>)>,

/// A token that can be used to retrieve the next set of items.
pub next_page_token: Option<(String, i64)>,
}
115 changes: 110 additions & 5 deletions ldk-server/src/io/persist/sqlite_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{fs, io};
use ldk_node::lightning::types::string::PrintableString;
use rusqlite::{named_params, Connection};

use crate::io::persist::paginated_kv_store::{ListResponse, PaginatedKVStore};
use crate::io::persist::paginated_kv_store::{ListResponse, ListValuesResponse, PaginatedKVStore};
use crate::io::utils::check_namespace_key_validity;

/// The default database file name.
Expand Down Expand Up @@ -105,14 +105,19 @@ impl SqliteStore {
io::Error::other(msg)
})?;

let index_creation_time_sql = format!(
"CREATE INDEX IF NOT EXISTS idx_creation_time ON {} (creation_time);",
let index_paginated_kv_namespace_time_key_sql = format!(
"CREATE INDEX IF NOT EXISTS idx_paginated_kv_namespace_time_key ON {} (
primary_namespace,
secondary_namespace,
creation_time DESC,
key ASC
);",
paginated_kv_table_name
);

connection.execute(&index_creation_time_sql, []).map_err(|e| {
connection.execute(&index_paginated_kv_namespace_time_key_sql, []).map_err(|e| {
let msg = format!(
"Failed to create index on creation_time, table {}: {}",
"Failed to create paginated namespace index, table {}: {}",
paginated_kv_table_name, e
);
io::Error::other(msg)
Expand All @@ -122,6 +127,7 @@ impl SqliteStore {
Ok(Self { connection, paginated_kv_table_name })
}

#[cfg_attr(not(test), allow(dead_code))]
fn read_internal(
&self, kv_table_name: &str, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> io::Result<Vec<u8>> {
Expand Down Expand Up @@ -285,6 +291,74 @@ impl PaginatedKVStore for SqliteStore {

Ok(ListResponse { keys, next_page_token })
}

fn list_values(
&self, primary_namespace: &str, secondary_namespace: &str,
page_token: Option<(String, i64)>,
) -> io::Result<ListValuesResponse> {
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list_values")?;

let locked_conn = self.connection.lock().unwrap();

// The cursor predicate is paired with this exact ordering: newer creation times first,
// then keys ascending to make rows with the same creation time deterministic.
let sql = format!(
"SELECT key, creation_time, value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace \
AND ( creation_time < :creation_time_token OR (creation_time = :creation_time_token AND key > :key_token) ) \
ORDER BY creation_time DESC, key ASC LIMIT :page_size",
self.paginated_kv_table_name
);

let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
let msg = format!("Failed to prepare statement: {}", e);
io::Error::other(msg)
})?;

let mut items: Vec<(String, Vec<u8>)> = Vec::new();
let page_token = page_token.unwrap_or(("".to_string(), i64::MAX));

let rows_iter = stmt
.query_map(
named_params! {
":primary_namespace": primary_namespace,
":secondary_namespace": secondary_namespace,
":key_token": page_token.0,
":creation_time_token": page_token.1,
":page_size": LIST_KEYS_MAX_PAGE_SIZE,
},
|row| {
let key: String = row.get(0)?;
let creation_time: i64 = row.get(1)?;
let value: Vec<u8> = row.get(2)?;
Ok((key, creation_time, value))
},
)
.map_err(|e| {
let msg = format!("Failed to retrieve queried rows: {}", e);
io::Error::other(msg)
})?;

let mut last_creation_time: Option<i64> = None;
for r in rows_iter {
let (k, ct, value) = r.map_err(|e| {
let msg = format!("Failed to retrieve queried rows: {}", e);
io::Error::other(msg)
})?;
items.push((k, value));
last_creation_time = Some(ct);
}

let last_key = items.last().map(|(key, _)| key.clone());
// Match list's token behavior: EOF is confirmed by a following empty page, not by
// withholding the token from a short non-empty page.
let next_page_token = if let (Some(k), Some(ct)) = (last_key, last_creation_time) {
Some((k, ct))
} else {
None
};

Ok(ListValuesResponse { items, next_page_token })
}
}

#[cfg(test)]
Expand Down Expand Up @@ -343,6 +417,24 @@ mod tests {
all_keys
};

let list_all_items =
|primary_namespace: &str, secondary_namespace: &str| -> Vec<(String, Vec<u8>)> {
let mut all_items = Vec::new();
let mut page_token = None;
loop {
let list_response = kv_store
.list_values(primary_namespace, secondary_namespace, page_token)
.unwrap();
assert!(list_response.items.len() <= LIST_KEYS_MAX_PAGE_SIZE as usize);
all_items.extend(list_response.items);
if list_response.next_page_token.is_none() {
break;
}
page_token = list_response.next_page_token;
}
all_items
};

// Test the basic KVStore operations.
for i in 0..110 {
kv_store
Expand All @@ -365,6 +457,14 @@ mod tests {
assert_eq!(listed_keys.len(), 110);
assert_eq!(listed_keys[0], testkey);

let listed_items = list_all_items(primary_namespace, secondary_namespace);
let listed_item_keys: Vec<String> =
listed_items.iter().map(|(key, _)| key.clone()).collect();
assert_eq!(listed_item_keys, listed_keys);
for (_, value) in listed_items {
assert_eq!(data, &*value);
}

let read_data = kv_store.read(primary_namespace, secondary_namespace, testkey).unwrap();
assert_eq!(data, &*read_data);

Expand All @@ -378,6 +478,11 @@ mod tests {
assert_eq!(listed_keys.len(), 1);
assert_eq!(listed_keys[0], max_chars);

let listed_items = list_all_items(&max_chars, &max_chars);
assert_eq!(listed_items.len(), 1);
assert_eq!(listed_items[0].0.as_str(), max_chars.as_str());
assert_eq!(data, listed_items[0].1.as_slice());

let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
assert_eq!(data, &*read_data);
}
Expand Down
Loading