OSDN Git Service

Merge branch 'master' of git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux-2.6
[uclinux-h8/linux.git] / ipc / msg.c
index 2b6fdbb..66c4f56 100644 (file)
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -76,7 +76,7 @@ struct msg_sender {
 
 static inline struct msg_queue *msq_obtain_object(struct ipc_namespace *ns, int id)
 {
-       struct kern_ipc_perm *ipcp = ipc_obtain_object(&msg_ids(ns), id);
+       struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&msg_ids(ns), id);
 
        if (IS_ERR(ipcp))
                return ERR_CAST(ipcp);
@@ -196,7 +196,7 @@ static void expunge_all(struct msg_queue *msq, int res)
                 * or dealing with -EAGAIN cases. See lockless receive part 1
                 * and 2 in do_msgrcv().
                 */
-               smp_mb();
+               smp_wmb(); /* barrier (B) */
                msr->r_msg = ERR_PTR(res);
        }
 }
@@ -580,7 +580,8 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
                                /* initialize pipelined send ordering */
                                msr->r_msg = NULL;
                                wake_up_process(msr->r_tsk);
-                               smp_mb(); /* see barrier comment below */
+                               /* barrier (B) see barrier comment below */
+                               smp_wmb();
                                msr->r_msg = ERR_PTR(-E2BIG);
                        } else {
                                msr->r_msg = NULL;
@@ -589,11 +590,12 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
                                wake_up_process(msr->r_tsk);
                                /*
                                 * Ensure that the wakeup is visible before
-                                * setting r_msg, as the receiving end depends
-                                * on it. See lockless receive part 1 and 2 in
-                                * do_msgrcv().
+                                * setting r_msg, as the receiving can otherwise
+                                * exit - once r_msg is set, the receiver can
+                                * continue. See lockless receive part 1 and 2
+                                * in do_msgrcv(). Barrier (B).
                                 */
-                               smp_mb();
+                               smp_wmb();
                                msr->r_msg = msg;
 
                                return 1;
@@ -932,12 +934,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
                /* Lockless receive, part 2:
                 * Wait until pipelined_send or expunge_all are outside of
                 * wake_up_process(). There is a race with exit(), see
-                * ipc/mqueue.c for the details.
+                * ipc/mqueue.c for the details. The correct serialization
+                * ensures that a receiver cannot continue without the wakeup
+                * being visibible _before_ setting r_msg:
+                *
+                * CPU 0                             CPU 1
+                * <loop receiver>
+                *   smp_rmb(); (A) <-- pair -.      <waker thread>
+                *   <load ->r_msg>           |        msr->r_msg = NULL;
+                *                            |        wake_up_process();
+                * <continue>                 `------> smp_wmb(); (B)
+                *                                     msr->r_msg = msg;
+                *
+                * Where (A) orders the message value read and where (B) orders
+                * the write to the r_msg -- done in both pipelined_send and
+                * expunge_all.
                 */
-               msg = (struct msg_msg *)msr_d.r_msg;
-               while (msg == NULL) {
-                       cpu_relax();
+               for (;;) {
+                       /*
+                        * Pairs with writer barrier in pipelined_send
+                        * or expunge_all.
+                        */
+                       smp_rmb(); /* barrier (A) */
                        msg = (struct msg_msg *)msr_d.r_msg;
+                       if (msg)
+                               break;
+
+                       /*
+                        * The cpu_relax() call is a compiler barrier
+                        * which forces everything in this loop to be
+                        * re-loaded.
+                        */
+                       cpu_relax();
                }
 
                /* Lockless receive, part 3: