OSDN Git Service

SCSI: Fix NULL pointer dereference in runtime PM
[uclinux-h8/linux.git] / drivers / misc / mic / scif / scif_nodeqp.c
1 /*
2  * Intel MIC Platform Software Stack (MPSS)
3  *
4  * Copyright(c) 2014 Intel Corporation.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License, version 2, as
8  * published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  * General Public License for more details.
14  *
15  * Intel SCIF driver.
16  *
17  */
18 #include "../bus/scif_bus.h"
19 #include "scif_peer_bus.h"
20 #include "scif_main.h"
21 #include "scif_nodeqp.h"
22 #include "scif_map.h"
23
24 /*
25  ************************************************************************
26  * SCIF node Queue Pair (QP) setup flow:
27  *
28  * 1) SCIF driver gets probed with a scif_hw_dev via the scif_hw_bus
29  * 2) scif_setup_qp(..) allocates the local qp and calls
30  *      scif_setup_qp_connect(..) which allocates and maps the local
31  *      buffer for the inbound QP
32  * 3) The local node updates the device page with the DMA address of the QP
33  * 4) A delayed work is scheduled (qp_dwork) which periodically reads if
34  *      the peer node has updated its QP DMA address
35  * 5) Once a valid non zero address is found in the QP DMA address field
36  *      in the device page, the local node maps the remote node's QP,
37  *      updates its outbound QP and sends a SCIF_INIT message to the peer
38  * 6) The SCIF_INIT message is received by the peer node QP interrupt bottom
39  *      half handler by calling scif_init(..)
40  * 7) scif_init(..) registers a new SCIF peer node by calling
41  *      scif_peer_register_device(..) which signifies the addition of a new
42  *      SCIF node
43  * 8) On the mgmt node, P2P network setup/teardown is initiated if all the
44  *      remote nodes are online via scif_p2p_setup(..)
45  * 9) For P2P setup, the host maps the remote nodes' aperture and memory
46  *      bars and sends a SCIF_NODE_ADD message to both nodes
47  * 10) As part of scif_nodeadd, both nodes set up their local inbound
48  *      QPs and send a SCIF_NODE_ADD_ACK to the mgmt node
49  * 11) As part of scif_node_add_ack(..) the mgmt node forwards the
50  *      SCIF_NODE_ADD_ACK to the remote nodes
51  * 12) As part of scif_node_add_ack(..) the remote nodes update their
52  *      outbound QPs, make sure they can access memory on the remote node
53  *      and then add a new SCIF peer node by calling
54  *      scif_peer_register_device(..) which signifies the addition of a new
55  *      SCIF node.
56  * 13) The SCIF network is now established across all nodes.
57  *
58  ************************************************************************
59  * SCIF node QP teardown flow (initiated by non mgmt node):
60  *
61  * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
62  * 2) The device page QP DMA address field is updated with 0x0
63  * 3) A non mgmt node now cleans up all local data structures and sends a
64  *      SCIF_EXIT message to the peer and waits for a SCIF_EXIT_ACK
65  * 4) As part of scif_exit(..) handling scif_disconnect_node(..) is called
66  * 5) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the
67  *      peers and waits for a SCIF_NODE_REMOVE_ACK
68  * 6) As part of scif_node_remove(..) a remote node unregisters the peer
69  *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
70  * 7) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
71  *      it sends itself a node remove message whose handling cleans up local
72  *      data structures and unregisters the peer node from the SCIF network
73  * 8) The mgmt node sends a SCIF_EXIT_ACK
74  * 9) Upon receipt of the SCIF_EXIT_ACK the node initiating the teardown
75  *      completes the SCIF remove routine
76  * 10) The SCIF network is now torn down for the node initiating the
77  *      teardown sequence
78  *
79  ************************************************************************
80  * SCIF node QP teardown flow (initiated by mgmt node):
81  *
82  * 1) SCIF driver gets a remove callback with a scif_hw_dev via the scif_hw_bus
83  * 2) The device page QP DMA address field is updated with 0x0
84  * 3) The mgmt node calls scif_disconnect_node(..)
85  * 4) scif_disconnect_node(..) sends a SCIF_NODE_REMOVE message to all the peers
86  *      and waits for a SCIF_NODE_REMOVE_ACK
87  * 5) As part of scif_node_remove(..) a remote node unregisters the peer
88  *      node from the SCIF network and sends a SCIF_NODE_REMOVE_ACK
89  * 6) When the mgmt node has received all the SCIF_NODE_REMOVE_ACKs
90  *      it unregisters the peer node from the SCIF network
91  * 7) The mgmt node sends a SCIF_EXIT message and waits for a SCIF_EXIT_ACK.
92  * 8) A non mgmt node upon receipt of a SCIF_EXIT message calls scif_stop(..)
93  *      which would clean up local data structures for all SCIF nodes and
94  *      then send a SCIF_EXIT_ACK back to the mgmt node
95  * 9) Upon receipt of the SCIF_EXIT_ACK the the mgmt node sends itself a node
96  *      remove message whose handling cleans up local data structures and
97  *      destroys any P2P mappings.
98  * 10) The SCIF hardware device for which a remove callback was received is now
99  *      disconnected from the SCIF network.
100  */
101 /*
102  * Initializes "local" data structures for the QP. Allocates the QP
103  * ring buffer (rb) and initializes the "in bound" queue.
104  */
105 int scif_setup_qp_connect(struct scif_qp *qp, dma_addr_t *qp_offset,
106                           int local_size, struct scif_dev *scifdev)
107 {
108         void *local_q = NULL;
109         int err = 0;
110         u32 tmp_rd = 0;
111
112         spin_lock_init(&qp->send_lock);
113         spin_lock_init(&qp->recv_lock);
114
115         local_q = kzalloc(local_size, GFP_KERNEL);
116         if (!local_q) {
117                 err = -ENOMEM;
118                 return err;
119         }
120         err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
121         if (err)
122                 goto kfree;
123         /*
124          * To setup the inbound_q, the buffer lives locally, the read pointer
125          * is remote and the write pointer is local.
126          */
127         scif_rb_init(&qp->inbound_q,
128                      &tmp_rd,
129                      &qp->local_write,
130                      local_q, get_count_order(local_size));
131         /*
132          * The read pointer is NULL initially and it is unsafe to use the ring
133          * buffer til this changes!
134          */
135         qp->inbound_q.read_ptr = NULL;
136         err = scif_map_single(qp_offset, qp,
137                               scifdev, sizeof(struct scif_qp));
138         if (err)
139                 goto unmap;
140         qp->local_qp = *qp_offset;
141         return err;
142 unmap:
143         scif_unmap_single(qp->local_buf, scifdev, local_size);
144         qp->local_buf = 0;
145 kfree:
146         kfree(local_q);
147         return err;
148 }
149
150 /* When the other side has already done it's allocation, this is called */
151 int scif_setup_qp_accept(struct scif_qp *qp, dma_addr_t *qp_offset,
152                          dma_addr_t phys, int local_size,
153                          struct scif_dev *scifdev)
154 {
155         void *local_q;
156         void *remote_q;
157         struct scif_qp *remote_qp;
158         int remote_size;
159         int err = 0;
160
161         spin_lock_init(&qp->send_lock);
162         spin_lock_init(&qp->recv_lock);
163         /* Start by figuring out where we need to point */
164         remote_qp = scif_ioremap(phys, sizeof(struct scif_qp), scifdev);
165         if (!remote_qp)
166                 return -EIO;
167         qp->remote_qp = remote_qp;
168         if (qp->remote_qp->magic != SCIFEP_MAGIC) {
169                 err = -EIO;
170                 goto iounmap;
171         }
172         qp->remote_buf = remote_qp->local_buf;
173         remote_size = qp->remote_qp->inbound_q.size;
174         remote_q = scif_ioremap(qp->remote_buf, remote_size, scifdev);
175         if (!remote_q) {
176                 err = -EIO;
177                 goto iounmap;
178         }
179         qp->remote_qp->local_write = 0;
180         /*
181          * To setup the outbound_q, the buffer lives in remote memory,
182          * the read pointer is local, the write pointer is remote
183          */
184         scif_rb_init(&qp->outbound_q,
185                      &qp->local_read,
186                      &qp->remote_qp->local_write,
187                      remote_q,
188                      get_count_order(remote_size));
189         local_q = kzalloc(local_size, GFP_KERNEL);
190         if (!local_q) {
191                 err = -ENOMEM;
192                 goto iounmap_1;
193         }
194         err = scif_map_single(&qp->local_buf, local_q, scifdev, local_size);
195         if (err)
196                 goto kfree;
197         qp->remote_qp->local_read = 0;
198         /*
199          * To setup the inbound_q, the buffer lives locally, the read pointer
200          * is remote and the write pointer is local
201          */
202         scif_rb_init(&qp->inbound_q,
203                      &qp->remote_qp->local_read,
204                      &qp->local_write,
205                      local_q, get_count_order(local_size));
206         err = scif_map_single(qp_offset, qp, scifdev,
207                               sizeof(struct scif_qp));
208         if (err)
209                 goto unmap;
210         qp->local_qp = *qp_offset;
211         return err;
212 unmap:
213         scif_unmap_single(qp->local_buf, scifdev, local_size);
214         qp->local_buf = 0;
215 kfree:
216         kfree(local_q);
217 iounmap_1:
218         scif_iounmap(remote_q, remote_size, scifdev);
219         qp->outbound_q.rb_base = NULL;
220 iounmap:
221         scif_iounmap(qp->remote_qp, sizeof(struct scif_qp), scifdev);
222         qp->remote_qp = NULL;
223         return err;
224 }
225
226 int scif_setup_qp_connect_response(struct scif_dev *scifdev,
227                                    struct scif_qp *qp, u64 payload)
228 {
229         int err = 0;
230         void *r_buf;
231         int remote_size;
232         phys_addr_t tmp_phys;
233
234         qp->remote_qp = scif_ioremap(payload, sizeof(struct scif_qp), scifdev);
235
236         if (!qp->remote_qp) {
237                 err = -ENOMEM;
238                 goto error;
239         }
240
241         if (qp->remote_qp->magic != SCIFEP_MAGIC) {
242                 dev_err(&scifdev->sdev->dev,
243                         "SCIFEP_MAGIC mismatch between self %d remote %d\n",
244                         scif_dev[scif_info.nodeid].node, scifdev->node);
245                 err = -ENODEV;
246                 goto error;
247         }
248
249         tmp_phys = qp->remote_qp->local_buf;
250         remote_size = qp->remote_qp->inbound_q.size;
251         r_buf = scif_ioremap(tmp_phys, remote_size, scifdev);
252
253         if (!r_buf)
254                 return -EIO;
255
256         qp->local_read = 0;
257         scif_rb_init(&qp->outbound_q,
258                      &qp->local_read,
259                      &qp->remote_qp->local_write,
260                      r_buf,
261                      get_count_order(remote_size));
262         /*
263          * resetup the inbound_q now that we know where the
264          * inbound_read really is.
265          */
266         scif_rb_init(&qp->inbound_q,
267                      &qp->remote_qp->local_read,
268                      &qp->local_write,
269                      qp->inbound_q.rb_base,
270                      get_count_order(qp->inbound_q.size));
271 error:
272         return err;
273 }
274
275 static __always_inline void
276 scif_send_msg_intr(struct scif_dev *scifdev)
277 {
278         struct scif_hw_dev *sdev = scifdev->sdev;
279
280         if (scifdev_is_p2p(scifdev))
281                 sdev->hw_ops->send_p2p_intr(sdev, scifdev->rdb, &scifdev->mmio);
282         else
283                 sdev->hw_ops->send_intr(sdev, scifdev->rdb);
284 }
285
286 int scif_qp_response(phys_addr_t phys, struct scif_dev *scifdev)
287 {
288         int err = 0;
289         struct scifmsg msg;
290
291         err = scif_setup_qp_connect_response(scifdev, scifdev->qpairs, phys);
292         if (!err) {
293                 /*
294                  * Now that everything is setup and mapped, we're ready
295                  * to tell the peer about our queue's location
296                  */
297                 msg.uop = SCIF_INIT;
298                 msg.dst.node = scifdev->node;
299                 err = scif_nodeqp_send(scifdev, &msg);
300         }
301         return err;
302 }
303
304 void scif_send_exit(struct scif_dev *scifdev)
305 {
306         struct scifmsg msg;
307         int ret;
308
309         scifdev->exit = OP_IN_PROGRESS;
310         msg.uop = SCIF_EXIT;
311         msg.src.node = scif_info.nodeid;
312         msg.dst.node = scifdev->node;
313         ret = scif_nodeqp_send(scifdev, &msg);
314         if (ret)
315                 goto done;
316         /* Wait for a SCIF_EXIT_ACK message */
317         wait_event_timeout(scif_info.exitwq, scifdev->exit == OP_COMPLETED,
318                            SCIF_NODE_ALIVE_TIMEOUT);
319 done:
320         scifdev->exit = OP_IDLE;
321 }
322
323 int scif_setup_qp(struct scif_dev *scifdev)
324 {
325         int err = 0;
326         int local_size;
327         struct scif_qp *qp;
328
329         local_size = SCIF_NODE_QP_SIZE;
330
331         qp = kzalloc(sizeof(*qp), GFP_KERNEL);
332         if (!qp) {
333                 err = -ENOMEM;
334                 return err;
335         }
336         qp->magic = SCIFEP_MAGIC;
337         scifdev->qpairs = qp;
338         err = scif_setup_qp_connect(qp, &scifdev->qp_dma_addr,
339                                     local_size, scifdev);
340         if (err)
341                 goto free_qp;
342         /*
343          * We're as setup as we can be. The inbound_q is setup, w/o a usable
344          * outbound q.  When we get a message, the read_ptr will be updated,
345          * and we will pull the message.
346          */
347         return err;
348 free_qp:
349         kfree(scifdev->qpairs);
350         scifdev->qpairs = NULL;
351         return err;
352 }
353
354 static void scif_p2p_freesg(struct scatterlist *sg)
355 {
356         kfree(sg);
357 }
358
359 static struct scatterlist *
360 scif_p2p_setsg(void __iomem *va, int page_size, int page_cnt)
361 {
362         struct scatterlist *sg;
363         struct page *page;
364         int i;
365
366         sg = kcalloc(page_cnt, sizeof(struct scatterlist), GFP_KERNEL);
367         if (!sg)
368                 return NULL;
369         sg_init_table(sg, page_cnt);
370         for (i = 0; i < page_cnt; i++) {
371                 page = vmalloc_to_page((void __force *)va);
372                 if (!page)
373                         goto p2p_sg_err;
374                 sg_set_page(&sg[i], page, page_size, 0);
375                 va += page_size;
376         }
377         return sg;
378 p2p_sg_err:
379         kfree(sg);
380         return NULL;
381 }
382
383 /* Init p2p mappings required to access peerdev from scifdev */
384 static struct scif_p2p_info *
385 scif_init_p2p_info(struct scif_dev *scifdev, struct scif_dev *peerdev)
386 {
387         struct scif_p2p_info *p2p;
388         int num_mmio_pages, num_aper_pages, sg_page_shift, err, num_aper_chunks;
389         struct scif_hw_dev *psdev = peerdev->sdev;
390         struct scif_hw_dev *sdev = scifdev->sdev;
391
392         num_mmio_pages = psdev->mmio->len >> PAGE_SHIFT;
393         num_aper_pages = psdev->aper->len >> PAGE_SHIFT;
394
395         p2p = kzalloc(sizeof(*p2p), GFP_KERNEL);
396         if (!p2p)
397                 return NULL;
398         p2p->ppi_sg[SCIF_PPI_MMIO] = scif_p2p_setsg(psdev->mmio->va,
399                                                     PAGE_SIZE, num_mmio_pages);
400         if (!p2p->ppi_sg[SCIF_PPI_MMIO])
401                 goto free_p2p;
402         p2p->sg_nentries[SCIF_PPI_MMIO] = num_mmio_pages;
403         sg_page_shift = get_order(min(psdev->aper->len, (u64)(1 << 30)));
404         num_aper_chunks = num_aper_pages >> (sg_page_shift - PAGE_SHIFT);
405         p2p->ppi_sg[SCIF_PPI_APER] = scif_p2p_setsg(psdev->aper->va,
406                                                     1 << sg_page_shift,
407                                                     num_aper_chunks);
408         p2p->sg_nentries[SCIF_PPI_APER] = num_aper_chunks;
409         err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
410                          num_mmio_pages, PCI_DMA_BIDIRECTIONAL);
411         if (err != num_mmio_pages)
412                 goto scif_p2p_free;
413         err = dma_map_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
414                          num_aper_chunks, PCI_DMA_BIDIRECTIONAL);
415         if (err != num_aper_chunks)
416                 goto dma_unmap;
417         p2p->ppi_da[SCIF_PPI_MMIO] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_MMIO]);
418         p2p->ppi_da[SCIF_PPI_APER] = sg_dma_address(p2p->ppi_sg[SCIF_PPI_APER]);
419         p2p->ppi_len[SCIF_PPI_MMIO] = num_mmio_pages;
420         p2p->ppi_len[SCIF_PPI_APER] = num_aper_pages;
421         p2p->ppi_peer_id = peerdev->node;
422         return p2p;
423 dma_unmap:
424         dma_unmap_sg(&sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
425                      p2p->sg_nentries[SCIF_PPI_MMIO], DMA_BIDIRECTIONAL);
426 scif_p2p_free:
427         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
428         scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
429 free_p2p:
430         kfree(p2p);
431         return NULL;
432 }
433
434 /**
435  * scif_node_connect: Respond to SCIF_NODE_CONNECT interrupt message
436  * @dst: Destination node
437  *
438  * Connect the src and dst node by setting up the p2p connection
439  * between them. Management node here acts like a proxy.
440  */
441 static void scif_node_connect(struct scif_dev *scifdev, int dst)
442 {
443         struct scif_dev *dev_j = scifdev;
444         struct scif_dev *dev_i = NULL;
445         struct scif_p2p_info *p2p_ij = NULL;    /* bus addr for j from i */
446         struct scif_p2p_info *p2p_ji = NULL;    /* bus addr for i from j */
447         struct scif_p2p_info *p2p;
448         struct list_head *pos, *tmp;
449         struct scifmsg msg;
450         int err;
451         u64 tmppayload;
452
453         if (dst < 1 || dst > scif_info.maxid)
454                 return;
455
456         dev_i = &scif_dev[dst];
457
458         if (!_scifdev_alive(dev_i))
459                 return;
460         /*
461          * If the p2p connection is already setup or in the process of setting
462          * up then just ignore this request. The requested node will get
463          * informed by SCIF_NODE_ADD_ACK or SCIF_NODE_ADD_NACK
464          */
465         if (!list_empty(&dev_i->p2p)) {
466                 list_for_each_safe(pos, tmp, &dev_i->p2p) {
467                         p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
468                         if (p2p->ppi_peer_id == dev_j->node)
469                                 return;
470                 }
471         }
472         p2p_ij = scif_init_p2p_info(dev_i, dev_j);
473         if (!p2p_ij)
474                 return;
475         p2p_ji = scif_init_p2p_info(dev_j, dev_i);
476         if (!p2p_ji)
477                 return;
478         list_add_tail(&p2p_ij->ppi_list, &dev_i->p2p);
479         list_add_tail(&p2p_ji->ppi_list, &dev_j->p2p);
480
481         /*
482          * Send a SCIF_NODE_ADD to dev_i, pass it its bus address
483          * as seen from dev_j
484          */
485         msg.uop = SCIF_NODE_ADD;
486         msg.src.node = dev_j->node;
487         msg.dst.node = dev_i->node;
488
489         msg.payload[0] = p2p_ji->ppi_da[SCIF_PPI_APER];
490         msg.payload[1] = p2p_ij->ppi_da[SCIF_PPI_MMIO];
491         msg.payload[2] = p2p_ij->ppi_da[SCIF_PPI_APER];
492         msg.payload[3] = p2p_ij->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
493
494         err = scif_nodeqp_send(dev_i,  &msg);
495         if (err) {
496                 dev_err(&scifdev->sdev->dev,
497                         "%s %d error %d\n", __func__, __LINE__, err);
498                 return;
499         }
500
501         /* Same as above but to dev_j */
502         msg.uop = SCIF_NODE_ADD;
503         msg.src.node = dev_i->node;
504         msg.dst.node = dev_j->node;
505
506         tmppayload = msg.payload[0];
507         msg.payload[0] = msg.payload[2];
508         msg.payload[2] = tmppayload;
509         msg.payload[1] = p2p_ji->ppi_da[SCIF_PPI_MMIO];
510         msg.payload[3] = p2p_ji->ppi_len[SCIF_PPI_APER] << PAGE_SHIFT;
511
512         scif_nodeqp_send(dev_j, &msg);
513 }
514
515 static void scif_p2p_setup(void)
516 {
517         int i, j;
518
519         if (!scif_info.p2p_enable)
520                 return;
521
522         for (i = 1; i <= scif_info.maxid; i++)
523                 if (!_scifdev_alive(&scif_dev[i]))
524                         return;
525
526         for (i = 1; i <= scif_info.maxid; i++) {
527                 for (j = 1; j <= scif_info.maxid; j++) {
528                         struct scif_dev *scifdev = &scif_dev[i];
529
530                         if (i == j)
531                                 continue;
532                         scif_node_connect(scifdev, j);
533                 }
534         }
535 }
536
537 void scif_qp_response_ack(struct work_struct *work)
538 {
539         struct scif_dev *scifdev = container_of(work, struct scif_dev,
540                                                 init_msg_work);
541         struct scif_peer_dev *spdev;
542
543         /* Drop the INIT message if it has already been received */
544         if (_scifdev_alive(scifdev))
545                 return;
546
547         spdev = scif_peer_register_device(scifdev);
548         if (IS_ERR(spdev))
549                 return;
550
551         if (scif_is_mgmt_node()) {
552                 mutex_lock(&scif_info.conflock);
553                 scif_p2p_setup();
554                 mutex_unlock(&scif_info.conflock);
555         }
556 }
557
558 static char *message_types[] = {"BAD",
559                                 "INIT",
560                                 "EXIT",
561                                 "SCIF_EXIT_ACK",
562                                 "SCIF_NODE_ADD",
563                                 "SCIF_NODE_ADD_ACK",
564                                 "SCIF_NODE_ADD_NACK",
565                                 "REMOVE_NODE",
566                                 "REMOVE_NODE_ACK",
567                                 "CNCT_REQ",
568                                 "CNCT_GNT",
569                                 "CNCT_GNTACK",
570                                 "CNCT_GNTNACK",
571                                 "CNCT_REJ",
572                                 "DISCNCT",
573                                 "DISCNT_ACK",
574                                 "CLIENT_SENT",
575                                 "CLIENT_RCVD",
576                                 "SCIF_GET_NODE_INFO"};
577
578 static void
579 scif_display_message(struct scif_dev *scifdev, struct scifmsg *msg,
580                      const char *label)
581 {
582         if (!scif_info.en_msg_log)
583                 return;
584         if (msg->uop > SCIF_MAX_MSG) {
585                 dev_err(&scifdev->sdev->dev,
586                         "%s: unknown msg type %d\n", label, msg->uop);
587                 return;
588         }
589         dev_info(&scifdev->sdev->dev,
590                  "%s: msg type %s, src %d:%d, dest %d:%d payload 0x%llx:0x%llx:0x%llx:0x%llx\n",
591                  label, message_types[msg->uop], msg->src.node, msg->src.port,
592                  msg->dst.node, msg->dst.port, msg->payload[0], msg->payload[1],
593                  msg->payload[2], msg->payload[3]);
594 }
595
596 int _scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
597 {
598         struct scif_qp *qp = scifdev->qpairs;
599         int err = -ENOMEM, loop_cnt = 0;
600
601         scif_display_message(scifdev, msg, "Sent");
602         if (!qp) {
603                 err = -EINVAL;
604                 goto error;
605         }
606         spin_lock(&qp->send_lock);
607
608         while ((err = scif_rb_write(&qp->outbound_q,
609                                     msg, sizeof(struct scifmsg)))) {
610                 mdelay(1);
611 #define SCIF_NODEQP_SEND_TO_MSEC (3 * 1000)
612                 if (loop_cnt++ > (SCIF_NODEQP_SEND_TO_MSEC)) {
613                         err = -ENODEV;
614                         break;
615                 }
616         }
617         if (!err)
618                 scif_rb_commit(&qp->outbound_q);
619         spin_unlock(&qp->send_lock);
620         if (!err) {
621                 if (scifdev_self(scifdev))
622                         /*
623                          * For loopback we need to emulate an interrupt by
624                          * queuing work for the queue handling real node
625                          * Qp interrupts.
626                          */
627                         queue_work(scifdev->intr_wq, &scifdev->intr_bh);
628                 else
629                         scif_send_msg_intr(scifdev);
630         }
631 error:
632         if (err)
633                 dev_dbg(&scifdev->sdev->dev,
634                         "%s %d error %d uop %d\n",
635                          __func__, __LINE__, err, msg->uop);
636         return err;
637 }
638
639 /**
640  * scif_nodeqp_send - Send a message on the node queue pair
641  * @scifdev: Scif Device.
642  * @msg: The message to be sent.
643  */
644 int scif_nodeqp_send(struct scif_dev *scifdev, struct scifmsg *msg)
645 {
646         int err;
647         struct device *spdev = NULL;
648
649         if (msg->uop > SCIF_EXIT_ACK) {
650                 /* Dont send messages once the exit flow has begun */
651                 if (OP_IDLE != scifdev->exit)
652                         return -ENODEV;
653                 spdev = scif_get_peer_dev(scifdev);
654                 if (IS_ERR(spdev)) {
655                         err = PTR_ERR(spdev);
656                         return err;
657                 }
658         }
659         err = _scif_nodeqp_send(scifdev, msg);
660         if (msg->uop > SCIF_EXIT_ACK)
661                 scif_put_peer_dev(spdev);
662         return err;
663 }
664
665 /*
666  * scif_misc_handler:
667  *
668  * Work queue handler for servicing miscellaneous SCIF tasks.
669  * Examples include:
670  * 1) Cleanup of zombie endpoints.
671  */
672 void scif_misc_handler(struct work_struct *work)
673 {
674         scif_cleanup_zombie_epd();
675 }
676
677 /**
678  * scif_init() - Respond to SCIF_INIT interrupt message
679  * @scifdev:    Remote SCIF device node
680  * @msg:        Interrupt message
681  */
682 static __always_inline void
683 scif_init(struct scif_dev *scifdev, struct scifmsg *msg)
684 {
685         /*
686          * Allow the thread waiting for device page updates for the peer QP DMA
687          * address to complete initializing the inbound_q.
688          */
689         flush_delayed_work(&scifdev->qp_dwork);
690         /*
691          * Delegate the peer device registration to a workqueue, otherwise if
692          * SCIF client probe (called during peer device registration) calls
693          * scif_connect(..), it will block the message processing thread causing
694          * a deadlock.
695          */
696         schedule_work(&scifdev->init_msg_work);
697 }
698
699 /**
700  * scif_exit() - Respond to SCIF_EXIT interrupt message
701  * @scifdev:    Remote SCIF device node
702  * @msg:        Interrupt message
703  *
704  * This function stops the SCIF interface for the node which sent
705  * the SCIF_EXIT message and starts waiting for that node to
706  * resetup the queue pair again.
707  */
708 static __always_inline void
709 scif_exit(struct scif_dev *scifdev, struct scifmsg *unused)
710 {
711         scifdev->exit_ack_pending = true;
712         if (scif_is_mgmt_node())
713                 scif_disconnect_node(scifdev->node, false);
714         else
715                 scif_stop(scifdev);
716         schedule_delayed_work(&scifdev->qp_dwork,
717                               msecs_to_jiffies(1000));
718 }
719
720 /**
721  * scif_exitack() - Respond to SCIF_EXIT_ACK interrupt message
722  * @scifdev:    Remote SCIF device node
723  * @msg:        Interrupt message
724  *
725  */
726 static __always_inline void
727 scif_exit_ack(struct scif_dev *scifdev, struct scifmsg *unused)
728 {
729         scifdev->exit = OP_COMPLETED;
730         wake_up(&scif_info.exitwq);
731 }
732
733 /**
734  * scif_node_add() - Respond to SCIF_NODE_ADD interrupt message
735  * @scifdev:    Remote SCIF device node
736  * @msg:        Interrupt message
737  *
738  * When the mgmt node driver has finished initializing a MIC node queue pair it
739  * marks the node as online. It then looks for all currently online MIC cards
740  * and send a SCIF_NODE_ADD message to identify the ID of the new card for
741  * peer to peer initialization
742  *
743  * The local node allocates its incoming queue and sends its address in the
744  * SCIF_NODE_ADD_ACK message back to the mgmt node, the mgmt node "reflects"
745  * this message to the new node
746  */
747 static __always_inline void
748 scif_node_add(struct scif_dev *scifdev, struct scifmsg *msg)
749 {
750         struct scif_dev *newdev;
751         dma_addr_t qp_offset;
752         int qp_connect;
753         struct scif_hw_dev *sdev;
754
755         dev_dbg(&scifdev->sdev->dev,
756                 "Scifdev %d:%d received NODE_ADD msg for node %d\n",
757                 scifdev->node, msg->dst.node, msg->src.node);
758         dev_dbg(&scifdev->sdev->dev,
759                 "Remote address for this node's aperture %llx\n",
760                 msg->payload[0]);
761         newdev = &scif_dev[msg->src.node];
762         newdev->node = msg->src.node;
763         newdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
764         sdev = newdev->sdev;
765
766         if (scif_setup_intr_wq(newdev)) {
767                 dev_err(&scifdev->sdev->dev,
768                         "failed to setup interrupts for %d\n", msg->src.node);
769                 goto interrupt_setup_error;
770         }
771         newdev->mmio.va = ioremap_nocache(msg->payload[1], sdev->mmio->len);
772         if (!newdev->mmio.va) {
773                 dev_err(&scifdev->sdev->dev,
774                         "failed to map mmio for %d\n", msg->src.node);
775                 goto mmio_map_error;
776         }
777         newdev->qpairs = kzalloc(sizeof(*newdev->qpairs), GFP_KERNEL);
778         if (!newdev->qpairs)
779                 goto qp_alloc_error;
780         /*
781          * Set the base address of the remote node's memory since it gets
782          * added to qp_offset
783          */
784         newdev->base_addr = msg->payload[0];
785
786         qp_connect = scif_setup_qp_connect(newdev->qpairs, &qp_offset,
787                                            SCIF_NODE_QP_SIZE, newdev);
788         if (qp_connect) {
789                 dev_err(&scifdev->sdev->dev,
790                         "failed to setup qp_connect %d\n", qp_connect);
791                 goto qp_connect_error;
792         }
793
794         newdev->db = sdev->hw_ops->next_db(sdev);
795         newdev->cookie = sdev->hw_ops->request_irq(sdev, scif_intr_handler,
796                                                    "SCIF_INTR", newdev,
797                                                    newdev->db);
798         if (IS_ERR(newdev->cookie))
799                 goto qp_connect_error;
800         newdev->qpairs->magic = SCIFEP_MAGIC;
801         newdev->qpairs->qp_state = SCIF_QP_OFFLINE;
802
803         msg->uop = SCIF_NODE_ADD_ACK;
804         msg->dst.node = msg->src.node;
805         msg->src.node = scif_info.nodeid;
806         msg->payload[0] = qp_offset;
807         msg->payload[2] = newdev->db;
808         scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
809         return;
810 qp_connect_error:
811         kfree(newdev->qpairs);
812         newdev->qpairs = NULL;
813 qp_alloc_error:
814         iounmap(newdev->mmio.va);
815         newdev->mmio.va = NULL;
816 mmio_map_error:
817 interrupt_setup_error:
818         dev_err(&scifdev->sdev->dev,
819                 "node add failed for node %d\n", msg->src.node);
820         msg->uop = SCIF_NODE_ADD_NACK;
821         msg->dst.node = msg->src.node;
822         msg->src.node = scif_info.nodeid;
823         scif_nodeqp_send(&scif_dev[SCIF_MGMT_NODE], msg);
824 }
825
826 void scif_poll_qp_state(struct work_struct *work)
827 {
828 #define SCIF_NODE_QP_RETRY 100
829 #define SCIF_NODE_QP_TIMEOUT 100
830         struct scif_dev *peerdev = container_of(work, struct scif_dev,
831                                                         p2p_dwork.work);
832         struct scif_qp *qp = &peerdev->qpairs[0];
833
834         if (qp->qp_state != SCIF_QP_ONLINE ||
835             qp->remote_qp->qp_state != SCIF_QP_ONLINE) {
836                 if (peerdev->p2p_retry++ == SCIF_NODE_QP_RETRY) {
837                         dev_err(&peerdev->sdev->dev,
838                                 "Warning: QP check timeout with state %d\n",
839                                 qp->qp_state);
840                         goto timeout;
841                 }
842                 schedule_delayed_work(&peerdev->p2p_dwork,
843                                       msecs_to_jiffies(SCIF_NODE_QP_TIMEOUT));
844                 return;
845         }
846         scif_peer_register_device(peerdev);
847         return;
848 timeout:
849         dev_err(&peerdev->sdev->dev,
850                 "%s %d remote node %d offline,  state = 0x%x\n",
851                 __func__, __LINE__, peerdev->node, qp->qp_state);
852         qp->remote_qp->qp_state = SCIF_QP_OFFLINE;
853         scif_cleanup_scifdev(peerdev);
854 }
855
856 /**
857  * scif_node_add_ack() - Respond to SCIF_NODE_ADD_ACK interrupt message
858  * @scifdev:    Remote SCIF device node
859  * @msg:        Interrupt message
860  *
861  * After a MIC node receives the SCIF_NODE_ADD_ACK message it send this
862  * message to the mgmt node to confirm the sequence is finished.
863  *
864  */
865 static __always_inline void
866 scif_node_add_ack(struct scif_dev *scifdev, struct scifmsg *msg)
867 {
868         struct scif_dev *peerdev;
869         struct scif_qp *qp;
870         struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
871
872         dev_dbg(&scifdev->sdev->dev,
873                 "Scifdev %d received SCIF_NODE_ADD_ACK msg src %d dst %d\n",
874                 scifdev->node, msg->src.node, msg->dst.node);
875         dev_dbg(&scifdev->sdev->dev,
876                 "payload %llx %llx %llx %llx\n", msg->payload[0],
877                 msg->payload[1], msg->payload[2], msg->payload[3]);
878         if (scif_is_mgmt_node()) {
879                 /*
880                  * the lock serializes with scif_qp_response_ack. The mgmt node
881                  * is forwarding the NODE_ADD_ACK message from src to dst we
882                  * need to make sure that the dst has already received a
883                  * NODE_ADD for src and setup its end of the qp to dst
884                  */
885                 mutex_lock(&scif_info.conflock);
886                 msg->payload[1] = scif_info.maxid;
887                 scif_nodeqp_send(dst_dev, msg);
888                 mutex_unlock(&scif_info.conflock);
889                 return;
890         }
891         peerdev = &scif_dev[msg->src.node];
892         peerdev->sdev = scif_dev[SCIF_MGMT_NODE].sdev;
893         peerdev->node = msg->src.node;
894
895         qp = &peerdev->qpairs[0];
896
897         if ((scif_setup_qp_connect_response(peerdev, &peerdev->qpairs[0],
898                                             msg->payload[0])))
899                 goto local_error;
900         peerdev->rdb = msg->payload[2];
901         qp->remote_qp->qp_state = SCIF_QP_ONLINE;
902         schedule_delayed_work(&peerdev->p2p_dwork, 0);
903         return;
904 local_error:
905         scif_cleanup_scifdev(peerdev);
906 }
907
908 /**
909  * scif_node_add_nack: Respond to SCIF_NODE_ADD_NACK interrupt message
910  * @msg:        Interrupt message
911  *
912  * SCIF_NODE_ADD failed, so inform the waiting wq.
913  */
914 static __always_inline void
915 scif_node_add_nack(struct scif_dev *scifdev, struct scifmsg *msg)
916 {
917         if (scif_is_mgmt_node()) {
918                 struct scif_dev *dst_dev = &scif_dev[msg->dst.node];
919
920                 dev_dbg(&scifdev->sdev->dev,
921                         "SCIF_NODE_ADD_NACK received from %d\n", scifdev->node);
922                 scif_nodeqp_send(dst_dev, msg);
923         }
924 }
925
926 /*
927  * scif_node_remove: Handle SCIF_NODE_REMOVE message
928  * @msg: Interrupt message
929  *
930  * Handle node removal.
931  */
932 static __always_inline void
933 scif_node_remove(struct scif_dev *scifdev, struct scifmsg *msg)
934 {
935         int node = msg->payload[0];
936         struct scif_dev *scdev = &scif_dev[node];
937
938         scdev->node_remove_ack_pending = true;
939         scif_handle_remove_node(node);
940 }
941
942 /*
943  * scif_node_remove_ack: Handle SCIF_NODE_REMOVE_ACK message
944  * @msg: Interrupt message
945  *
946  * The peer has acked a SCIF_NODE_REMOVE message.
947  */
948 static __always_inline void
949 scif_node_remove_ack(struct scif_dev *scifdev, struct scifmsg *msg)
950 {
951         struct scif_dev *sdev = &scif_dev[msg->payload[0]];
952
953         atomic_inc(&sdev->disconn_rescnt);
954         wake_up(&sdev->disconn_wq);
955 }
956
957 /**
958  * scif_get_node_info: Respond to SCIF_GET_NODE_INFO interrupt message
959  * @msg:        Interrupt message
960  *
961  * Retrieve node info i.e maxid and total from the mgmt node.
962  */
963 static __always_inline void
964 scif_get_node_info_resp(struct scif_dev *scifdev, struct scifmsg *msg)
965 {
966         if (scif_is_mgmt_node()) {
967                 swap(msg->dst.node, msg->src.node);
968                 mutex_lock(&scif_info.conflock);
969                 msg->payload[1] = scif_info.maxid;
970                 msg->payload[2] = scif_info.total;
971                 mutex_unlock(&scif_info.conflock);
972                 scif_nodeqp_send(scifdev, msg);
973         } else {
974                 struct completion *node_info =
975                         (struct completion *)msg->payload[3];
976
977                 mutex_lock(&scif_info.conflock);
978                 scif_info.maxid = msg->payload[1];
979                 scif_info.total = msg->payload[2];
980                 complete_all(node_info);
981                 mutex_unlock(&scif_info.conflock);
982         }
983 }
984
985 static void
986 scif_msg_unknown(struct scif_dev *scifdev, struct scifmsg *msg)
987 {
988         /* Bogus Node Qp Message? */
989         dev_err(&scifdev->sdev->dev,
990                 "Unknown message 0x%xn scifdev->node 0x%x\n",
991                 msg->uop, scifdev->node);
992 }
993
994 static void (*scif_intr_func[SCIF_MAX_MSG + 1])
995             (struct scif_dev *, struct scifmsg *msg) = {
996         scif_msg_unknown,       /* Error */
997         scif_init,              /* SCIF_INIT */
998         scif_exit,              /* SCIF_EXIT */
999         scif_exit_ack,          /* SCIF_EXIT_ACK */
1000         scif_node_add,          /* SCIF_NODE_ADD */
1001         scif_node_add_ack,      /* SCIF_NODE_ADD_ACK */
1002         scif_node_add_nack,     /* SCIF_NODE_ADD_NACK */
1003         scif_node_remove,       /* SCIF_NODE_REMOVE */
1004         scif_node_remove_ack,   /* SCIF_NODE_REMOVE_ACK */
1005         scif_cnctreq,           /* SCIF_CNCT_REQ */
1006         scif_cnctgnt,           /* SCIF_CNCT_GNT */
1007         scif_cnctgnt_ack,       /* SCIF_CNCT_GNTACK */
1008         scif_cnctgnt_nack,      /* SCIF_CNCT_GNTNACK */
1009         scif_cnctrej,           /* SCIF_CNCT_REJ */
1010         scif_discnct,           /* SCIF_DISCNCT */
1011         scif_discnt_ack,        /* SCIF_DISCNT_ACK */
1012         scif_clientsend,        /* SCIF_CLIENT_SENT */
1013         scif_clientrcvd,        /* SCIF_CLIENT_RCVD */
1014         scif_get_node_info_resp,/* SCIF_GET_NODE_INFO */
1015 };
1016
1017 /**
1018  * scif_nodeqp_msg_handler() - Common handler for node messages
1019  * @scifdev: Remote device to respond to
1020  * @qp: Remote memory pointer
1021  * @msg: The message to be handled.
1022  *
1023  * This routine calls the appropriate routine to handle a Node Qp
1024  * message receipt
1025  */
1026 static int scif_max_msg_id = SCIF_MAX_MSG;
1027
1028 static void
1029 scif_nodeqp_msg_handler(struct scif_dev *scifdev,
1030                         struct scif_qp *qp, struct scifmsg *msg)
1031 {
1032         scif_display_message(scifdev, msg, "Rcvd");
1033
1034         if (msg->uop > (u32)scif_max_msg_id) {
1035                 /* Bogus Node Qp Message? */
1036                 dev_err(&scifdev->sdev->dev,
1037                         "Unknown message 0x%xn scifdev->node 0x%x\n",
1038                         msg->uop, scifdev->node);
1039                 return;
1040         }
1041
1042         scif_intr_func[msg->uop](scifdev, msg);
1043 }
1044
1045 /**
1046  * scif_nodeqp_intrhandler() - Interrupt handler for node messages
1047  * @scifdev:    Remote device to respond to
1048  * @qp:         Remote memory pointer
1049  *
1050  * This routine is triggered by the interrupt mechanism.  It reads
1051  * messages from the node queue RB and calls the Node QP Message handling
1052  * routine.
1053  */
1054 void scif_nodeqp_intrhandler(struct scif_dev *scifdev, struct scif_qp *qp)
1055 {
1056         struct scifmsg msg;
1057         int read_size;
1058
1059         do {
1060                 read_size = scif_rb_get_next(&qp->inbound_q, &msg, sizeof(msg));
1061                 if (!read_size)
1062                         break;
1063                 scif_nodeqp_msg_handler(scifdev, qp, &msg);
1064                 /*
1065                  * The node queue pair is unmapped so skip the read pointer
1066                  * update after receipt of a SCIF_EXIT_ACK
1067                  */
1068                 if (SCIF_EXIT_ACK == msg.uop)
1069                         break;
1070                 scif_rb_update_read_ptr(&qp->inbound_q);
1071         } while (1);
1072 }
1073
1074 /**
1075  * scif_loopb_wq_handler - Loopback Workqueue Handler.
1076  * @work: loop back work
1077  *
1078  * This work queue routine is invoked by the loopback work queue handler.
1079  * It grabs the recv lock, dequeues any available messages from the head
1080  * of the loopback message list, calls the node QP message handler,
1081  * waits for it to return, then frees up this message and dequeues more
1082  * elements of the list if available.
1083  */
1084 static void scif_loopb_wq_handler(struct work_struct *unused)
1085 {
1086         struct scif_dev *scifdev = scif_info.loopb_dev;
1087         struct scif_qp *qp = scifdev->qpairs;
1088         struct scif_loopb_msg *msg;
1089
1090         do {
1091                 msg = NULL;
1092                 spin_lock(&qp->recv_lock);
1093                 if (!list_empty(&scif_info.loopb_recv_q)) {
1094                         msg = list_first_entry(&scif_info.loopb_recv_q,
1095                                                struct scif_loopb_msg,
1096                                                list);
1097                         list_del(&msg->list);
1098                 }
1099                 spin_unlock(&qp->recv_lock);
1100
1101                 if (msg) {
1102                         scif_nodeqp_msg_handler(scifdev, qp, &msg->msg);
1103                         kfree(msg);
1104                 }
1105         } while (msg);
1106 }
1107
1108 /**
1109  * scif_loopb_msg_handler() - Workqueue handler for loopback messages.
1110  * @scifdev: SCIF device
1111  * @qp: Queue pair.
1112  *
1113  * This work queue routine is triggered when a loopback message is received.
1114  *
1115  * We need special handling for receiving Node Qp messages on a loopback SCIF
1116  * device via two workqueues for receiving messages.
1117  *
1118  * The reason we need the extra workqueue which is not required with *normal*
1119  * non-loopback SCIF devices is the potential classic deadlock described below:
1120  *
1121  * Thread A tries to send a message on a loopback SCIF device and blocks since
1122  * there is no space in the RB while it has the send_lock held or another
1123  * lock called lock X for example.
1124  *
1125  * Thread B: The Loopback Node QP message receive workqueue receives the message
1126  * and tries to send a message (eg an ACK) to the loopback SCIF device. It tries
1127  * to grab the send lock again or lock X and deadlocks with Thread A. The RB
1128  * cannot be drained any further due to this classic deadlock.
1129  *
1130  * In order to avoid deadlocks as mentioned above we have an extra level of
1131  * indirection achieved by having two workqueues.
1132  * 1) The first workqueue whose handler is scif_loopb_msg_handler reads
1133  * messages from the Node QP RB, adds them to a list and queues work for the
1134  * second workqueue.
1135  *
1136  * 2) The second workqueue whose handler is scif_loopb_wq_handler dequeues
1137  * messages from the list, handles them, frees up the memory and dequeues
1138  * more elements from the list if possible.
1139  */
1140 int
1141 scif_loopb_msg_handler(struct scif_dev *scifdev, struct scif_qp *qp)
1142 {
1143         int read_size;
1144         struct scif_loopb_msg *msg;
1145
1146         do {
1147                 msg = kmalloc(sizeof(*msg), GFP_KERNEL);
1148                 if (!msg)
1149                         return -ENOMEM;
1150                 read_size = scif_rb_get_next(&qp->inbound_q, &msg->msg,
1151                                              sizeof(struct scifmsg));
1152                 if (read_size != sizeof(struct scifmsg)) {
1153                         kfree(msg);
1154                         scif_rb_update_read_ptr(&qp->inbound_q);
1155                         break;
1156                 }
1157                 spin_lock(&qp->recv_lock);
1158                 list_add_tail(&msg->list, &scif_info.loopb_recv_q);
1159                 spin_unlock(&qp->recv_lock);
1160                 queue_work(scif_info.loopb_wq, &scif_info.loopb_work);
1161                 scif_rb_update_read_ptr(&qp->inbound_q);
1162         } while (read_size == sizeof(struct scifmsg));
1163         return read_size;
1164 }
1165
1166 /**
1167  * scif_setup_loopback_qp - One time setup work for Loopback Node Qp.
1168  * @scifdev: SCIF device
1169  *
1170  * Sets up the required loopback workqueues, queue pairs and ring buffers
1171  */
1172 int scif_setup_loopback_qp(struct scif_dev *scifdev)
1173 {
1174         int err = 0;
1175         void *local_q;
1176         struct scif_qp *qp;
1177         struct scif_peer_dev *spdev;
1178
1179         err = scif_setup_intr_wq(scifdev);
1180         if (err)
1181                 goto exit;
1182         INIT_LIST_HEAD(&scif_info.loopb_recv_q);
1183         snprintf(scif_info.loopb_wqname, sizeof(scif_info.loopb_wqname),
1184                  "SCIF LOOPB %d", scifdev->node);
1185         scif_info.loopb_wq =
1186                 alloc_ordered_workqueue(scif_info.loopb_wqname, 0);
1187         if (!scif_info.loopb_wq) {
1188                 err = -ENOMEM;
1189                 goto destroy_intr;
1190         }
1191         INIT_WORK(&scif_info.loopb_work, scif_loopb_wq_handler);
1192         /* Allocate Self Qpair */
1193         scifdev->qpairs = kzalloc(sizeof(*scifdev->qpairs), GFP_KERNEL);
1194         if (!scifdev->qpairs) {
1195                 err = -ENOMEM;
1196                 goto destroy_loopb_wq;
1197         }
1198
1199         qp = scifdev->qpairs;
1200         qp->magic = SCIFEP_MAGIC;
1201         spin_lock_init(&qp->send_lock);
1202         spin_lock_init(&qp->recv_lock);
1203
1204         local_q = kzalloc(SCIF_NODE_QP_SIZE, GFP_KERNEL);
1205         if (!local_q) {
1206                 err = -ENOMEM;
1207                 goto free_qpairs;
1208         }
1209         /*
1210          * For loopback the inbound_q and outbound_q are essentially the same
1211          * since the Node sends a message on the loopback interface to the
1212          * outbound_q which is then received on the inbound_q.
1213          */
1214         scif_rb_init(&qp->outbound_q,
1215                      &qp->local_read,
1216                      &qp->local_write,
1217                      local_q, get_count_order(SCIF_NODE_QP_SIZE));
1218
1219         scif_rb_init(&qp->inbound_q,
1220                      &qp->local_read,
1221                      &qp->local_write,
1222                      local_q, get_count_order(SCIF_NODE_QP_SIZE));
1223         scif_info.nodeid = scifdev->node;
1224         spdev = scif_peer_register_device(scifdev);
1225         if (IS_ERR(spdev)) {
1226                 err = PTR_ERR(spdev);
1227                 goto free_local_q;
1228         }
1229         scif_info.loopb_dev = scifdev;
1230         return err;
1231 free_local_q:
1232         kfree(local_q);
1233 free_qpairs:
1234         kfree(scifdev->qpairs);
1235 destroy_loopb_wq:
1236         destroy_workqueue(scif_info.loopb_wq);
1237 destroy_intr:
1238         scif_destroy_intr_wq(scifdev);
1239 exit:
1240         return err;
1241 }
1242
1243 /**
1244  * scif_destroy_loopback_qp - One time uninit work for Loopback Node Qp
1245  * @scifdev: SCIF device
1246  *
1247  * Destroys the workqueues and frees up the Ring Buffer and Queue Pair memory.
1248  */
1249 int scif_destroy_loopback_qp(struct scif_dev *scifdev)
1250 {
1251         struct scif_peer_dev *spdev;
1252
1253         rcu_read_lock();
1254         spdev = rcu_dereference(scifdev->spdev);
1255         rcu_read_unlock();
1256         if (spdev)
1257                 scif_peer_unregister_device(spdev);
1258         destroy_workqueue(scif_info.loopb_wq);
1259         scif_destroy_intr_wq(scifdev);
1260         kfree(scifdev->qpairs->outbound_q.rb_base);
1261         kfree(scifdev->qpairs);
1262         scifdev->sdev = NULL;
1263         scif_info.loopb_dev = NULL;
1264         return 0;
1265 }
1266
1267 void scif_destroy_p2p(struct scif_dev *scifdev)
1268 {
1269         struct scif_dev *peer_dev;
1270         struct scif_p2p_info *p2p;
1271         struct list_head *pos, *tmp;
1272         int bd;
1273
1274         mutex_lock(&scif_info.conflock);
1275         /* Free P2P mappings in the given node for all its peer nodes */
1276         list_for_each_safe(pos, tmp, &scifdev->p2p) {
1277                 p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1278                 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_MMIO],
1279                              p2p->sg_nentries[SCIF_PPI_MMIO],
1280                              DMA_BIDIRECTIONAL);
1281                 dma_unmap_sg(&scifdev->sdev->dev, p2p->ppi_sg[SCIF_PPI_APER],
1282                              p2p->sg_nentries[SCIF_PPI_APER],
1283                              DMA_BIDIRECTIONAL);
1284                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1285                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1286                 list_del(pos);
1287                 kfree(p2p);
1288         }
1289
1290         /* Free P2P mapping created in the peer nodes for the given node */
1291         for (bd = SCIF_MGMT_NODE + 1; bd <= scif_info.maxid; bd++) {
1292                 peer_dev = &scif_dev[bd];
1293                 list_for_each_safe(pos, tmp, &peer_dev->p2p) {
1294                         p2p = list_entry(pos, struct scif_p2p_info, ppi_list);
1295                         if (p2p->ppi_peer_id == scifdev->node) {
1296                                 dma_unmap_sg(&peer_dev->sdev->dev,
1297                                              p2p->ppi_sg[SCIF_PPI_MMIO],
1298                                              p2p->sg_nentries[SCIF_PPI_MMIO],
1299                                              DMA_BIDIRECTIONAL);
1300                                 dma_unmap_sg(&peer_dev->sdev->dev,
1301                                              p2p->ppi_sg[SCIF_PPI_APER],
1302                                              p2p->sg_nentries[SCIF_PPI_APER],
1303                                              DMA_BIDIRECTIONAL);
1304                                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_MMIO]);
1305                                 scif_p2p_freesg(p2p->ppi_sg[SCIF_PPI_APER]);
1306                                 list_del(pos);
1307                                 kfree(p2p);
1308                         }
1309                 }
1310         }
1311         mutex_unlock(&scif_info.conflock);
1312 }