Skip to content

Commit 2874d1a

Browse files
Alexander Aringteigland
authored andcommitted
fs: dlm: add functionality to re-transmit a message
This patch introduces a retransmit functionality for a lowcomms message handle. It's just allocates a new buffer and transmit it again, no special handling about prioritize it because keeping bytestream in order. To avoid another connection look some refactor was done to make a new buffer allocation with a preexisting connection pointer. Signed-off-by: Alexander Aring <aahringo@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com>
1 parent 8f2dc78 commit 2874d1a

2 files changed

Lines changed: 67 additions & 19 deletions

File tree

fs/dlm/lowcomms.c

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ struct writequeue_entry {
126126

127127
struct dlm_msg {
128128
struct writequeue_entry *entry;
129+
struct dlm_msg *orig_msg;
130+
bool retransmit;
129131
void *ppc;
130132
int len;
131133
int idx; /* new()/commit() idx exchange */
@@ -1055,6 +1057,10 @@ static void free_entry(struct writequeue_entry *e)
10551057
struct dlm_msg *msg, *tmp;
10561058

10571059
list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
1060+
if (msg->orig_msg) {
1061+
msg->orig_msg->retransmit = false;
1062+
kref_put(&msg->orig_msg->ref, dlm_msg_release);
1063+
}
10581064
list_del(&msg->list);
10591065
kref_put(&msg->ref, dlm_msg_release);
10601066
}
@@ -1494,11 +1500,37 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
14941500
return e;
14951501
};
14961502

1503+
static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1504+
gfp_t allocation, char **ppc,
1505+
void (*cb)(struct dlm_mhandle *mh),
1506+
struct dlm_mhandle *mh)
1507+
{
1508+
struct writequeue_entry *e;
1509+
struct dlm_msg *msg;
1510+
1511+
msg = kzalloc(sizeof(*msg), allocation);
1512+
if (!msg)
1513+
return NULL;
1514+
1515+
kref_init(&msg->ref);
1516+
1517+
e = new_wq_entry(con, len, allocation, ppc, cb, mh);
1518+
if (!e) {
1519+
kfree(msg);
1520+
return NULL;
1521+
}
1522+
1523+
msg->ppc = *ppc;
1524+
msg->len = len;
1525+
msg->entry = e;
1526+
1527+
return msg;
1528+
}
1529+
14971530
struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
14981531
char **ppc, void (*cb)(struct dlm_mhandle *mh),
14991532
struct dlm_mhandle *mh)
15001533
{
1501-
struct writequeue_entry *e;
15021534
struct connection *con;
15031535
struct dlm_msg *msg;
15041536
int idx;
@@ -1518,32 +1550,18 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
15181550
return NULL;
15191551
}
15201552

1521-
msg = kzalloc(sizeof(*msg), allocation);
1553+
msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh);
15221554
if (!msg) {
15231555
srcu_read_unlock(&connections_srcu, idx);
15241556
return NULL;
15251557
}
15261558

1527-
kref_init(&msg->ref);
1528-
1529-
e = new_wq_entry(con, len, allocation, ppc, cb, mh);
1530-
if (!e) {
1531-
srcu_read_unlock(&connections_srcu, idx);
1532-
kfree(msg);
1533-
return NULL;
1534-
}
1535-
1536-
msg->ppc = *ppc;
1537-
msg->len = len;
1538-
msg->entry = e;
1539-
15401559
/* we assume if successful commit must called */
15411560
msg->idx = idx;
1542-
15431561
return msg;
15441562
}
15451563

1546-
void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1564+
static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
15471565
{
15481566
struct writequeue_entry *e = msg->entry;
15491567
struct connection *con = e->con;
@@ -1561,20 +1579,49 @@ void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
15611579
spin_unlock(&con->writequeue_lock);
15621580

15631581
queue_work(send_workqueue, &con->swork);
1564-
srcu_read_unlock(&connections_srcu, msg->idx);
15651582
return;
15661583

15671584
out:
15681585
spin_unlock(&con->writequeue_lock);
1569-
srcu_read_unlock(&connections_srcu, msg->idx);
15701586
return;
15711587
}
15721588

1589+
void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1590+
{
1591+
_dlm_lowcomms_commit_msg(msg);
1592+
srcu_read_unlock(&connections_srcu, msg->idx);
1593+
}
1594+
15731595
void dlm_lowcomms_put_msg(struct dlm_msg *msg)
15741596
{
15751597
kref_put(&msg->ref, dlm_msg_release);
15761598
}
15771599

1600+
/* does not held connections_srcu, usage workqueue only */
1601+
int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1602+
{
1603+
struct dlm_msg *msg_resend;
1604+
char *ppc;
1605+
1606+
if (msg->retransmit)
1607+
return 1;
1608+
1609+
msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
1610+
GFP_ATOMIC, &ppc, NULL, NULL);
1611+
if (!msg_resend)
1612+
return -ENOMEM;
1613+
1614+
msg->retransmit = true;
1615+
kref_get(&msg->ref);
1616+
msg_resend->orig_msg = msg;
1617+
1618+
memcpy(ppc, msg->ppc, msg->len);
1619+
_dlm_lowcomms_commit_msg(msg_resend);
1620+
dlm_lowcomms_put_msg(msg_resend);
1621+
1622+
return 0;
1623+
}
1624+
15781625
/* Send a message */
15791626
static void send_to_sock(struct connection *con)
15801627
{

fs/dlm/lowcomms.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
2727
struct dlm_mhandle *mh);
2828
void dlm_lowcomms_commit_msg(struct dlm_msg *msg);
2929
void dlm_lowcomms_put_msg(struct dlm_msg *msg);
30+
int dlm_lowcomms_resend_msg(struct dlm_msg *msg);
3031
int dlm_lowcomms_connect_node(int nodeid);
3132
int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark);
3233
int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len);

0 commit comments

Comments
 (0)