Skip to content

Commit 11ed914

Browse files
spikehaxboe
authored andcommitted
io_uring/zcrx: add io_recvzc request
Add io_uring opcode OP_RECV_ZC for doing zero copy reads out of a socket. Only the connection should be land on the specific rx queue set up for zero copy, and the socket must be handled by the io_uring instance that the rx queue was registered for zero copy with. That's because neither net_iovs / buffers from our queue can be read by outside applications, nor zero copy is possible if traffic for the zero copy connection goes to another queue. This coordination is outside of the scope of this patch series. Also, any traffic directed to the zero copy enabled queue is immediately visible to the application, which is why CAP_NET_ADMIN is required at the registration step. Of course, no data is actually read out of the socket, it has already been copied by the netdev into userspace memory via DMA. OP_RECV_ZC reads skbs out of the socket and checks that its frags are indeed net_iovs that belong to io_uring. A cqe is queued for each one of these frags. Recall that each cqe is a big cqe, with the top half being an io_uring_zcrx_cqe. The cqe res field contains the len or error. The lower IORING_ZCRX_AREA_SHIFT bits of the struct io_uring_zcrx_cqe::off field contain the offset relative to the start of the zero copy area. The upper part of the off field is trivially zero, and will be used to carry the area id. For now, there is no limit as to how much work each OP_RECV_ZC request does. It will attempt to drain a socket of all available data. This request always operates in multishot mode. Reviewed-by: Jens Axboe <axboe@kernel.dk> Signed-off-by: David Wei <dw@davidwei.uk> Acked-by: Jakub Kicinski <kuba@kernel.org> Link: https://lore.kernel.org/r/20250215000947.789731-7-dw@davidwei.uk Signed-off-by: Jens Axboe <axboe@kernel.dk>
1 parent db07044 commit 11ed914

6 files changed

Lines changed: 302 additions & 1 deletion

File tree

include/uapi/linux/io_uring.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ struct io_uring_sqe {
8787
union {
8888
__s32 splice_fd_in;
8989
__u32 file_index;
90+
__u32 zcrx_ifq_idx;
9091
__u32 optlen;
9192
struct {
9293
__u16 addr_len;
@@ -278,6 +279,7 @@ enum io_uring_op {
278279
IORING_OP_FTRUNCATE,
279280
IORING_OP_BIND,
280281
IORING_OP_LISTEN,
282+
IORING_OP_RECV_ZC,
281283

282284
/* this goes last, obviously */
283285
IORING_OP_LAST,

io_uring/io_uring.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,16 @@ static inline bool io_get_cqe(struct io_ring_ctx *ctx, struct io_uring_cqe **ret
185185
return io_get_cqe_overflow(ctx, ret, false);
186186
}
187187

188+
static inline bool io_defer_get_uncommited_cqe(struct io_ring_ctx *ctx,
189+
struct io_uring_cqe **cqe_ret)
190+
{
191+
io_lockdep_assert_cq_locked(ctx);
192+
193+
ctx->cq_extra++;
194+
ctx->submit_state.cq_flush = true;
195+
return io_get_cqe(ctx, cqe_ret);
196+
}
197+
188198
static __always_inline bool io_fill_cqe_req(struct io_ring_ctx *ctx,
189199
struct io_kiocb *req)
190200
{

io_uring/net.c

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "net.h"
1717
#include "notif.h"
1818
#include "rsrc.h"
19+
#include "zcrx.h"
1920

2021
#if defined(CONFIG_NET)
2122
struct io_shutdown {
@@ -89,6 +90,13 @@ struct io_sr_msg {
8990
*/
9091
#define MULTISHOT_MAX_RETRY 32
9192

93+
struct io_recvzc {
94+
struct file *file;
95+
unsigned msg_flags;
96+
u16 flags;
97+
struct io_zcrx_ifq *ifq;
98+
};
99+
92100
int io_shutdown_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
93101
{
94102
struct io_shutdown *shutdown = io_kiocb_to_cmd(req, struct io_shutdown);
@@ -1227,6 +1235,70 @@ int io_recv(struct io_kiocb *req, unsigned int issue_flags)
12271235
return ret;
12281236
}
12291237

1238+
int io_recvzc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1239+
{
1240+
struct io_recvzc *zc = io_kiocb_to_cmd(req, struct io_recvzc);
1241+
unsigned ifq_idx;
1242+
1243+
if (unlikely(sqe->file_index || sqe->addr2 || sqe->addr ||
1244+
sqe->len || sqe->addr3))
1245+
return -EINVAL;
1246+
1247+
ifq_idx = READ_ONCE(sqe->zcrx_ifq_idx);
1248+
if (ifq_idx != 0)
1249+
return -EINVAL;
1250+
zc->ifq = req->ctx->ifq;
1251+
if (!zc->ifq)
1252+
return -EINVAL;
1253+
1254+
zc->flags = READ_ONCE(sqe->ioprio);
1255+
zc->msg_flags = READ_ONCE(sqe->msg_flags);
1256+
if (zc->msg_flags)
1257+
return -EINVAL;
1258+
if (zc->flags & ~(IORING_RECVSEND_POLL_FIRST | IORING_RECV_MULTISHOT))
1259+
return -EINVAL;
1260+
/* multishot required */
1261+
if (!(zc->flags & IORING_RECV_MULTISHOT))
1262+
return -EINVAL;
1263+
/* All data completions are posted as aux CQEs. */
1264+
req->flags |= REQ_F_APOLL_MULTISHOT;
1265+
1266+
return 0;
1267+
}
1268+
1269+
int io_recvzc(struct io_kiocb *req, unsigned int issue_flags)
1270+
{
1271+
struct io_recvzc *zc = io_kiocb_to_cmd(req, struct io_recvzc);
1272+
struct socket *sock;
1273+
int ret;
1274+
1275+
if (!(req->flags & REQ_F_POLLED) &&
1276+
(zc->flags & IORING_RECVSEND_POLL_FIRST))
1277+
return -EAGAIN;
1278+
1279+
sock = sock_from_file(req->file);
1280+
if (unlikely(!sock))
1281+
return -ENOTSOCK;
1282+
1283+
ret = io_zcrx_recv(req, zc->ifq, sock, zc->msg_flags | MSG_DONTWAIT,
1284+
issue_flags);
1285+
if (unlikely(ret <= 0) && ret != -EAGAIN) {
1286+
if (ret == -ERESTARTSYS)
1287+
ret = -EINTR;
1288+
1289+
req_set_fail(req);
1290+
io_req_set_res(req, ret, 0);
1291+
1292+
if (issue_flags & IO_URING_F_MULTISHOT)
1293+
return IOU_STOP_MULTISHOT;
1294+
return IOU_OK;
1295+
}
1296+
1297+
if (issue_flags & IO_URING_F_MULTISHOT)
1298+
return IOU_ISSUE_SKIP_COMPLETE;
1299+
return -EAGAIN;
1300+
}
1301+
12301302
void io_send_zc_cleanup(struct io_kiocb *req)
12311303
{
12321304
struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg);

io_uring/opdef.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "waitid.h"
3838
#include "futex.h"
3939
#include "truncate.h"
40+
#include "zcrx.h"
4041

4142
static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
4243
{
@@ -514,6 +515,18 @@ const struct io_issue_def io_issue_defs[] = {
514515
.async_size = sizeof(struct io_async_msghdr),
515516
#else
516517
.prep = io_eopnotsupp_prep,
518+
#endif
519+
},
520+
[IORING_OP_RECV_ZC] = {
521+
.needs_file = 1,
522+
.unbound_nonreg_file = 1,
523+
.pollin = 1,
524+
.ioprio = 1,
525+
#if defined(CONFIG_NET)
526+
.prep = io_recvzc_prep,
527+
.issue = io_recvzc,
528+
#else
529+
.prep = io_eopnotsupp_prep,
517530
#endif
518531
},
519532
};
@@ -745,6 +758,9 @@ const struct io_cold_def io_cold_defs[] = {
745758
[IORING_OP_LISTEN] = {
746759
.name = "LISTEN",
747760
},
761+
[IORING_OP_RECV_ZC] = {
762+
.name = "RECV_ZC",
763+
},
748764
};
749765

750766
const char *io_uring_get_opcode(u8 opcode)

io_uring/zcrx.c

Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include <net/netlink.h>
1414

1515
#include <trace/events/page_pool.h>
16+
#include <net/tcp.h>
17+
#include <net/rps.h>
1618

1719
#include <uapi/linux/io_uring.h>
1820

@@ -91,7 +93,12 @@ static void io_zcrx_sync_for_device(const struct page_pool *pool,
9193

9294
#define IO_RQ_MAX_ENTRIES 32768
9395

94-
__maybe_unused
96+
struct io_zcrx_args {
97+
struct io_kiocb *req;
98+
struct io_zcrx_ifq *ifq;
99+
struct socket *sock;
100+
};
101+
95102
static const struct memory_provider_ops io_uring_pp_zc_ops;
96103

97104
static inline struct io_zcrx_area *io_zcrx_iov_to_area(const struct net_iov *niov)
@@ -118,6 +125,11 @@ static bool io_zcrx_put_niov_uref(struct net_iov *niov)
118125
return true;
119126
}
120127

128+
static void io_zcrx_get_niov_uref(struct net_iov *niov)
129+
{
130+
atomic_inc(io_get_user_counter(niov));
131+
}
132+
121133
static int io_allocate_rbuf_ring(struct io_zcrx_ifq *ifq,
122134
struct io_uring_zcrx_ifq_reg *reg,
123135
struct io_uring_region_desc *rd)
@@ -614,3 +626,179 @@ static const struct memory_provider_ops io_uring_pp_zc_ops = {
614626
.nl_fill = io_pp_nl_fill,
615627
.uninstall = io_pp_uninstall,
616628
};
629+
630+
static bool io_zcrx_queue_cqe(struct io_kiocb *req, struct net_iov *niov,
631+
struct io_zcrx_ifq *ifq, int off, int len)
632+
{
633+
struct io_uring_zcrx_cqe *rcqe;
634+
struct io_zcrx_area *area;
635+
struct io_uring_cqe *cqe;
636+
u64 offset;
637+
638+
if (!io_defer_get_uncommited_cqe(req->ctx, &cqe))
639+
return false;
640+
641+
cqe->user_data = req->cqe.user_data;
642+
cqe->res = len;
643+
cqe->flags = IORING_CQE_F_MORE;
644+
645+
area = io_zcrx_iov_to_area(niov);
646+
offset = off + (net_iov_idx(niov) << PAGE_SHIFT);
647+
rcqe = (struct io_uring_zcrx_cqe *)(cqe + 1);
648+
rcqe->off = offset + ((u64)area->area_id << IORING_ZCRX_AREA_SHIFT);
649+
rcqe->__pad = 0;
650+
return true;
651+
}
652+
653+
static int io_zcrx_recv_frag(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
654+
const skb_frag_t *frag, int off, int len)
655+
{
656+
struct net_iov *niov;
657+
658+
if (unlikely(!skb_frag_is_net_iov(frag)))
659+
return -EOPNOTSUPP;
660+
661+
niov = netmem_to_net_iov(frag->netmem);
662+
if (niov->pp->mp_ops != &io_uring_pp_zc_ops ||
663+
niov->pp->mp_priv != ifq)
664+
return -EFAULT;
665+
666+
if (!io_zcrx_queue_cqe(req, niov, ifq, off + skb_frag_off(frag), len))
667+
return -ENOSPC;
668+
669+
/*
670+
* Prevent it from being recycled while user is accessing it.
671+
* It has to be done before grabbing a user reference.
672+
*/
673+
page_pool_ref_netmem(net_iov_to_netmem(niov));
674+
io_zcrx_get_niov_uref(niov);
675+
return len;
676+
}
677+
678+
static int
679+
io_zcrx_recv_skb(read_descriptor_t *desc, struct sk_buff *skb,
680+
unsigned int offset, size_t len)
681+
{
682+
struct io_zcrx_args *args = desc->arg.data;
683+
struct io_zcrx_ifq *ifq = args->ifq;
684+
struct io_kiocb *req = args->req;
685+
struct sk_buff *frag_iter;
686+
unsigned start, start_off;
687+
int i, copy, end, off;
688+
int ret = 0;
689+
690+
start = skb_headlen(skb);
691+
start_off = offset;
692+
693+
if (offset < start)
694+
return -EOPNOTSUPP;
695+
696+
for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
697+
const skb_frag_t *frag;
698+
699+
if (WARN_ON(start > offset + len))
700+
return -EFAULT;
701+
702+
frag = &skb_shinfo(skb)->frags[i];
703+
end = start + skb_frag_size(frag);
704+
705+
if (offset < end) {
706+
copy = end - offset;
707+
if (copy > len)
708+
copy = len;
709+
710+
off = offset - start;
711+
ret = io_zcrx_recv_frag(req, ifq, frag, off, copy);
712+
if (ret < 0)
713+
goto out;
714+
715+
offset += ret;
716+
len -= ret;
717+
if (len == 0 || ret != copy)
718+
goto out;
719+
}
720+
start = end;
721+
}
722+
723+
skb_walk_frags(skb, frag_iter) {
724+
if (WARN_ON(start > offset + len))
725+
return -EFAULT;
726+
727+
end = start + frag_iter->len;
728+
if (offset < end) {
729+
copy = end - offset;
730+
if (copy > len)
731+
copy = len;
732+
733+
off = offset - start;
734+
ret = io_zcrx_recv_skb(desc, frag_iter, off, copy);
735+
if (ret < 0)
736+
goto out;
737+
738+
offset += ret;
739+
len -= ret;
740+
if (len == 0 || ret != copy)
741+
goto out;
742+
}
743+
start = end;
744+
}
745+
746+
out:
747+
if (offset == start_off)
748+
return ret;
749+
return offset - start_off;
750+
}
751+
752+
static int io_zcrx_tcp_recvmsg(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
753+
struct sock *sk, int flags,
754+
unsigned issue_flags)
755+
{
756+
struct io_zcrx_args args = {
757+
.req = req,
758+
.ifq = ifq,
759+
.sock = sk->sk_socket,
760+
};
761+
read_descriptor_t rd_desc = {
762+
.count = 1,
763+
.arg.data = &args,
764+
};
765+
int ret;
766+
767+
lock_sock(sk);
768+
ret = tcp_read_sock(sk, &rd_desc, io_zcrx_recv_skb);
769+
if (ret <= 0) {
770+
if (ret < 0 || sock_flag(sk, SOCK_DONE))
771+
goto out;
772+
if (sk->sk_err)
773+
ret = sock_error(sk);
774+
else if (sk->sk_shutdown & RCV_SHUTDOWN)
775+
goto out;
776+
else if (sk->sk_state == TCP_CLOSE)
777+
ret = -ENOTCONN;
778+
else
779+
ret = -EAGAIN;
780+
} else if (sock_flag(sk, SOCK_DONE)) {
781+
/* Make it to retry until it finally gets 0. */
782+
if (issue_flags & IO_URING_F_MULTISHOT)
783+
ret = IOU_REQUEUE;
784+
else
785+
ret = -EAGAIN;
786+
}
787+
out:
788+
release_sock(sk);
789+
return ret;
790+
}
791+
792+
int io_zcrx_recv(struct io_kiocb *req, struct io_zcrx_ifq *ifq,
793+
struct socket *sock, unsigned int flags,
794+
unsigned issue_flags)
795+
{
796+
struct sock *sk = sock->sk;
797+
const struct proto *prot = READ_ONCE(sk->sk_prot);
798+
799+
if (prot->recvmsg != tcp_recvmsg)
800+
return -EPROTONOSUPPORT;
801+
802+
sock_rps_record_flow(sk);
803+
return io_zcrx_tcp_recvmsg(req, ifq, sk, flags, issue_flags);
804+
}

0 commit comments

Comments
 (0)