229 lines
7.1 KiB
Diff
229 lines
7.1 KiB
Diff
From: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
|
|
Date: Fri, 30 Oct 2015 11:59:07 +0100
|
|
Subject: ipc/msg: Implement lockless pipelined wakeups
|
|
Origin: https://www.kernel.org/pub/linux/kernel/projects/rt/4.8/older/patches-4.8.6-rt5.tar.xz
|
|
|
|
This patch moves the wakeup_process() invocation so it is not done under
|
|
the perm->lock by making use of a lockless wake_q. With this change, the
|
|
waiter is woken up once the message has been assigned and it does not
|
|
need to loop on SMP if the message points to NULL. In the signal case we
|
|
still need to check the pointer under the lock to verify the state.
|
|
|
|
This change should also avoid the introduction of preempt_disable() in
|
|
-RT which avoids a busy-loop which pools for the NULL -> !NULL
|
|
change if the waiter has a higher priority compared to the waker.
|
|
|
|
Cc: Davidlohr Bueso <dave@stgolabs.net>
|
|
Cc: Manfred Spraul <manfred@colorfullife.com>
|
|
Cc: Andrew Morton <akpm@linux-foundation.org>
|
|
Cc: George Spelvin <linux@horizon.com>
|
|
Cc: Thomas Gleixner <tglx@linutronix.de>
|
|
Cc: Peter Zijlstra <peterz@infradead.org>
|
|
Signed-off-by: Sebastian Andrzej Siewior <bigeasy@linutronix.de>
|
|
---
|
|
|
|
ipc/msg.c | 101 +++++++++++++++++---------------------------------------------
|
|
1 file changed, 28 insertions(+), 73 deletions(-)
|
|
|
|
--- a/ipc/msg.c
|
|
+++ b/ipc/msg.c
|
|
@@ -183,20 +183,14 @@ static void ss_wakeup(struct list_head *
|
|
}
|
|
}
|
|
|
|
-static void expunge_all(struct msg_queue *msq, int res)
|
|
+static void expunge_all(struct msg_queue *msq, int res,
|
|
+ struct wake_q_head *wake_q)
|
|
{
|
|
struct msg_receiver *msr, *t;
|
|
|
|
list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
|
|
- msr->r_msg = NULL; /* initialize expunge ordering */
|
|
- wake_up_process(msr->r_tsk);
|
|
- /*
|
|
- * Ensure that the wakeup is visible before setting r_msg as
|
|
- * the receiving end depends on it: either spinning on a nil,
|
|
- * or dealing with -EAGAIN cases. See lockless receive part 1
|
|
- * and 2 in do_msgrcv().
|
|
- */
|
|
- smp_wmb(); /* barrier (B) */
|
|
+
|
|
+ wake_q_add(wake_q, msr->r_tsk);
|
|
msr->r_msg = ERR_PTR(res);
|
|
}
|
|
}
|
|
@@ -213,11 +207,13 @@ static void freeque(struct ipc_namespace
|
|
{
|
|
struct msg_msg *msg, *t;
|
|
struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
|
|
+ WAKE_Q(wake_q);
|
|
|
|
- expunge_all(msq, -EIDRM);
|
|
+ expunge_all(msq, -EIDRM, &wake_q);
|
|
ss_wakeup(&msq->q_senders, 1);
|
|
msg_rmid(ns, msq);
|
|
ipc_unlock_object(&msq->q_perm);
|
|
+ wake_up_q(&wake_q);
|
|
rcu_read_unlock();
|
|
|
|
list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
|
|
@@ -342,6 +338,7 @@ static int msgctl_down(struct ipc_namesp
|
|
struct kern_ipc_perm *ipcp;
|
|
struct msqid64_ds uninitialized_var(msqid64);
|
|
struct msg_queue *msq;
|
|
+ WAKE_Q(wake_q);
|
|
int err;
|
|
|
|
if (cmd == IPC_SET) {
|
|
@@ -389,7 +386,7 @@ static int msgctl_down(struct ipc_namesp
|
|
/* sleeping receivers might be excluded by
|
|
* stricter permissions.
|
|
*/
|
|
- expunge_all(msq, -EAGAIN);
|
|
+ expunge_all(msq, -EAGAIN, &wake_q);
|
|
/* sleeping senders might be able to send
|
|
* due to a larger queue size.
|
|
*/
|
|
@@ -402,6 +399,7 @@ static int msgctl_down(struct ipc_namesp
|
|
|
|
out_unlock0:
|
|
ipc_unlock_object(&msq->q_perm);
|
|
+ wake_up_q(&wake_q);
|
|
out_unlock1:
|
|
rcu_read_unlock();
|
|
out_up:
|
|
@@ -566,7 +564,8 @@ static int testmsg(struct msg_msg *msg,
|
|
return 0;
|
|
}
|
|
|
|
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
|
|
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
|
|
+ struct wake_q_head *wake_q)
|
|
{
|
|
struct msg_receiver *msr, *t;
|
|
|
|
@@ -577,27 +576,13 @@ static inline int pipelined_send(struct
|
|
|
|
list_del(&msr->r_list);
|
|
if (msr->r_maxsize < msg->m_ts) {
|
|
- /* initialize pipelined send ordering */
|
|
- msr->r_msg = NULL;
|
|
- wake_up_process(msr->r_tsk);
|
|
- /* barrier (B) see barrier comment below */
|
|
- smp_wmb();
|
|
+ wake_q_add(wake_q, msr->r_tsk);
|
|
msr->r_msg = ERR_PTR(-E2BIG);
|
|
} else {
|
|
- msr->r_msg = NULL;
|
|
msq->q_lrpid = task_pid_vnr(msr->r_tsk);
|
|
msq->q_rtime = get_seconds();
|
|
- wake_up_process(msr->r_tsk);
|
|
- /*
|
|
- * Ensure that the wakeup is visible before
|
|
- * setting r_msg, as the receiving can otherwise
|
|
- * exit - once r_msg is set, the receiver can
|
|
- * continue. See lockless receive part 1 and 2
|
|
- * in do_msgrcv(). Barrier (B).
|
|
- */
|
|
- smp_wmb();
|
|
+ wake_q_add(wake_q, msr->r_tsk);
|
|
msr->r_msg = msg;
|
|
-
|
|
return 1;
|
|
}
|
|
}
|
|
@@ -613,6 +598,7 @@ long do_msgsnd(int msqid, long mtype, vo
|
|
struct msg_msg *msg;
|
|
int err;
|
|
struct ipc_namespace *ns;
|
|
+ WAKE_Q(wake_q);
|
|
|
|
ns = current->nsproxy->ipc_ns;
|
|
|
|
@@ -698,7 +684,7 @@ long do_msgsnd(int msqid, long mtype, vo
|
|
msq->q_lspid = task_tgid_vnr(current);
|
|
msq->q_stime = get_seconds();
|
|
|
|
- if (!pipelined_send(msq, msg)) {
|
|
+ if (!pipelined_send(msq, msg, &wake_q)) {
|
|
/* no one is waiting for this message, enqueue it */
|
|
list_add_tail(&msg->m_list, &msq->q_messages);
|
|
msq->q_cbytes += msgsz;
|
|
@@ -712,6 +698,7 @@ long do_msgsnd(int msqid, long mtype, vo
|
|
|
|
out_unlock0:
|
|
ipc_unlock_object(&msq->q_perm);
|
|
+ wake_up_q(&wake_q);
|
|
out_unlock1:
|
|
rcu_read_unlock();
|
|
if (msg != NULL)
|
|
@@ -932,57 +919,25 @@ long do_msgrcv(int msqid, void __user *b
|
|
rcu_read_lock();
|
|
|
|
/* Lockless receive, part 2:
|
|
- * Wait until pipelined_send or expunge_all are outside of
|
|
- * wake_up_process(). There is a race with exit(), see
|
|
- * ipc/mqueue.c for the details. The correct serialization
|
|
- * ensures that a receiver cannot continue without the wakeup
|
|
- * being visibible _before_ setting r_msg:
|
|
+ * The work in pipelined_send() and expunge_all():
|
|
+ * - Set pointer to message
|
|
+ * - Queue the receiver task for later wakeup
|
|
+ * - Wake up the process after the lock is dropped.
|
|
*
|
|
- * CPU 0 CPU 1
|
|
- * <loop receiver>
|
|
- * smp_rmb(); (A) <-- pair -. <waker thread>
|
|
- * <load ->r_msg> | msr->r_msg = NULL;
|
|
- * | wake_up_process();
|
|
- * <continue> `------> smp_wmb(); (B)
|
|
- * msr->r_msg = msg;
|
|
- *
|
|
- * Where (A) orders the message value read and where (B) orders
|
|
- * the write to the r_msg -- done in both pipelined_send and
|
|
- * expunge_all.
|
|
+ * Should the process wake up before this wakeup (due to a
|
|
+ * signal) it will either see the message and continue …
|
|
*/
|
|
- for (;;) {
|
|
- /*
|
|
- * Pairs with writer barrier in pipelined_send
|
|
- * or expunge_all.
|
|
- */
|
|
- smp_rmb(); /* barrier (A) */
|
|
- msg = (struct msg_msg *)msr_d.r_msg;
|
|
- if (msg)
|
|
- break;
|
|
|
|
- /*
|
|
- * The cpu_relax() call is a compiler barrier
|
|
- * which forces everything in this loop to be
|
|
- * re-loaded.
|
|
- */
|
|
- cpu_relax();
|
|
- }
|
|
-
|
|
- /* Lockless receive, part 3:
|
|
- * If there is a message or an error then accept it without
|
|
- * locking.
|
|
- */
|
|
+ msg = (struct msg_msg *)msr_d.r_msg;
|
|
if (msg != ERR_PTR(-EAGAIN))
|
|
goto out_unlock1;
|
|
|
|
- /* Lockless receive, part 3:
|
|
- * Acquire the queue spinlock.
|
|
- */
|
|
+ /*
|
|
+ * … or see -EAGAIN, acquire the lock to check the message
|
|
+ * again.
|
|
+ */
|
|
ipc_lock_object(&msq->q_perm);
|
|
|
|
- /* Lockless receive, part 4:
|
|
- * Repeat test after acquiring the spinlock.
|
|
- */
|
|
msg = (struct msg_msg *)msr_d.r_msg;
|
|
if (msg != ERR_PTR(-EAGAIN))
|
|
goto out_unlock0;
|