OSDN Git Service

Merge branch 'drbd-8.4_ed6' into for-3.8-drivers-drbd-8.4_ed6
[sagit-ice-cold/kernel_xiaomi_msm8998.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_peer_request *peer_req;
337         struct page *page = NULL;
338         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
339
340         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
341                 return NULL;
342
343         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
344         if (!peer_req) {
345                 if (!(gfp_mask & __GFP_NOWARN))
346                         dev_err(DEV, "%s: allocation failed\n", __func__);
347                 return NULL;
348         }
349
350         if (data_size) {
351                 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
352                 if (!page)
353                         goto fail;
354         }
355
356         drbd_clear_interval(&peer_req->i);
357         peer_req->i.size = data_size;
358         peer_req->i.sector = sector;
359         peer_req->i.local = false;
360         peer_req->i.waiting = false;
361
362         peer_req->epoch = NULL;
363         peer_req->w.mdev = mdev;
364         peer_req->pages = page;
365         atomic_set(&peer_req->pending_bios, 0);
366         peer_req->flags = 0;
367         /*
368          * The block_id is opaque to the receiver.  It is not endianness
369          * converted, and sent back to the sender unchanged.
370          */
371         peer_req->block_id = id;
372
373         return peer_req;
374
375  fail:
376         mempool_free(peer_req, drbd_ee_mempool);
377         return NULL;
378 }
379
380 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
381                        int is_net)
382 {
383         if (peer_req->flags & EE_HAS_DIGEST)
384                 kfree(peer_req->digest);
385         drbd_free_pages(mdev, peer_req->pages, is_net);
386         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
387         D_ASSERT(drbd_interval_empty(&peer_req->i));
388         mempool_free(peer_req, drbd_ee_mempool);
389 }
390
391 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
392 {
393         LIST_HEAD(work_list);
394         struct drbd_peer_request *peer_req, *t;
395         int count = 0;
396         int is_net = list == &mdev->net_ee;
397
398         spin_lock_irq(&mdev->tconn->req_lock);
399         list_splice_init(list, &work_list);
400         spin_unlock_irq(&mdev->tconn->req_lock);
401
402         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
403                 __drbd_free_peer_req(mdev, peer_req, is_net);
404                 count++;
405         }
406         return count;
407 }
408
409 /*
410  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
411  */
412 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
413 {
414         LIST_HEAD(work_list);
415         LIST_HEAD(reclaimed);
416         struct drbd_peer_request *peer_req, *t;
417         int err = 0;
418
419         spin_lock_irq(&mdev->tconn->req_lock);
420         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
421         list_splice_init(&mdev->done_ee, &work_list);
422         spin_unlock_irq(&mdev->tconn->req_lock);
423
424         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
425                 drbd_free_net_peer_req(mdev, peer_req);
426
427         /* possible callbacks here:
428          * e_end_block, and e_end_resync_block, e_send_superseded.
429          * all ignore the last argument.
430          */
431         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432                 int err2;
433
434                 /* list_del not necessary, next/prev members not touched */
435                 err2 = peer_req->w.cb(&peer_req->w, !!err);
436                 if (!err)
437                         err = err2;
438                 drbd_free_peer_req(mdev, peer_req);
439         }
440         wake_up(&mdev->ee_wait);
441
442         return err;
443 }
444
445 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
446                                      struct list_head *head)
447 {
448         DEFINE_WAIT(wait);
449
450         /* avoids spin_lock/unlock
451          * and calling prepare_to_wait in the fast path */
452         while (!list_empty(head)) {
453                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454                 spin_unlock_irq(&mdev->tconn->req_lock);
455                 io_schedule();
456                 finish_wait(&mdev->ee_wait, &wait);
457                 spin_lock_irq(&mdev->tconn->req_lock);
458         }
459 }
460
461 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
462                                     struct list_head *head)
463 {
464         spin_lock_irq(&mdev->tconn->req_lock);
465         _drbd_wait_ee_list_empty(mdev, head);
466         spin_unlock_irq(&mdev->tconn->req_lock);
467 }
468
469 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
470 {
471         mm_segment_t oldfs;
472         struct kvec iov = {
473                 .iov_base = buf,
474                 .iov_len = size,
475         };
476         struct msghdr msg = {
477                 .msg_iovlen = 1,
478                 .msg_iov = (struct iovec *)&iov,
479                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
480         };
481         int rv;
482
483         oldfs = get_fs();
484         set_fs(KERNEL_DS);
485         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
486         set_fs(oldfs);
487
488         return rv;
489 }
490
491 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
492 {
493         int rv;
494
495         rv = drbd_recv_short(tconn->data.socket, buf, size, 0);
496
497         if (rv < 0) {
498                 if (rv == -ECONNRESET)
499                         conn_info(tconn, "sock was reset by peer\n");
500                 else if (rv != -ERESTARTSYS)
501                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
502         } else if (rv == 0) {
503                 if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
504                         long t;
505                         rcu_read_lock();
506                         t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
507                         rcu_read_unlock();
508
509                         t = wait_event_timeout(tconn->ping_wait, tconn->cstate < C_WF_REPORT_PARAMS, t);
510
511                         if (t)
512                                 goto out;
513                 }
514                 conn_info(tconn, "sock was shut down by peer\n");
515         }
516
517         if (rv != size)
518                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
519
520 out:
521         return rv;
522 }
523
524 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
525 {
526         int err;
527
528         err = drbd_recv(tconn, buf, size);
529         if (err != size) {
530                 if (err >= 0)
531                         err = -EIO;
532         } else
533                 err = 0;
534         return err;
535 }
536
537 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
538 {
539         int err;
540
541         err = drbd_recv_all(tconn, buf, size);
542         if (err && !signal_pending(current))
543                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
544         return err;
545 }
546
547 /* quoting tcp(7):
548  *   On individual connections, the socket buffer size must be set prior to the
549  *   listen(2) or connect(2) calls in order to have it take effect.
550  * This is our wrapper to do so.
551  */
552 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
553                 unsigned int rcv)
554 {
555         /* open coded SO_SNDBUF, SO_RCVBUF */
556         if (snd) {
557                 sock->sk->sk_sndbuf = snd;
558                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
559         }
560         if (rcv) {
561                 sock->sk->sk_rcvbuf = rcv;
562                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
563         }
564 }
565
566 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
567 {
568         const char *what;
569         struct socket *sock;
570         struct sockaddr_in6 src_in6;
571         struct sockaddr_in6 peer_in6;
572         struct net_conf *nc;
573         int err, peer_addr_len, my_addr_len;
574         int sndbuf_size, rcvbuf_size, connect_int;
575         int disconnect_on_error = 1;
576
577         rcu_read_lock();
578         nc = rcu_dereference(tconn->net_conf);
579         if (!nc) {
580                 rcu_read_unlock();
581                 return NULL;
582         }
583         sndbuf_size = nc->sndbuf_size;
584         rcvbuf_size = nc->rcvbuf_size;
585         connect_int = nc->connect_int;
586         rcu_read_unlock();
587
588         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
589         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
590
591         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
592                 src_in6.sin6_port = 0;
593         else
594                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
595
596         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
597         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
598
599         what = "sock_create_kern";
600         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
601                                SOCK_STREAM, IPPROTO_TCP, &sock);
602         if (err < 0) {
603                 sock = NULL;
604                 goto out;
605         }
606
607         sock->sk->sk_rcvtimeo =
608         sock->sk->sk_sndtimeo = connect_int * HZ;
609         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
610
611        /* explicitly bind to the configured IP as source IP
612         *  for the outgoing connections.
613         *  This is needed for multihomed hosts and to be
614         *  able to use lo: interfaces for drbd.
615         * Make sure to use 0 as port number, so linux selects
616         *  a free one dynamically.
617         */
618         what = "bind before connect";
619         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
620         if (err < 0)
621                 goto out;
622
623         /* connect may fail, peer not yet available.
624          * stay C_WF_CONNECTION, don't go Disconnecting! */
625         disconnect_on_error = 0;
626         what = "connect";
627         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
628
629 out:
630         if (err < 0) {
631                 if (sock) {
632                         sock_release(sock);
633                         sock = NULL;
634                 }
635                 switch (-err) {
636                         /* timeout, busy, signal pending */
637                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
638                 case EINTR: case ERESTARTSYS:
639                         /* peer not (yet) available, network problem */
640                 case ECONNREFUSED: case ENETUNREACH:
641                 case EHOSTDOWN:    case EHOSTUNREACH:
642                         disconnect_on_error = 0;
643                         break;
644                 default:
645                         conn_err(tconn, "%s failed, err = %d\n", what, err);
646                 }
647                 if (disconnect_on_error)
648                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
649         }
650
651         return sock;
652 }
653
654 struct accept_wait_data {
655         struct drbd_tconn *tconn;
656         struct socket *s_listen;
657         struct completion door_bell;
658         void (*original_sk_state_change)(struct sock *sk);
659
660 };
661
662 static void drbd_incoming_connection(struct sock *sk)
663 {
664         struct accept_wait_data *ad = sk->sk_user_data;
665         void (*state_change)(struct sock *sk);
666
667         state_change = ad->original_sk_state_change;
668         if (sk->sk_state == TCP_ESTABLISHED)
669                 complete(&ad->door_bell);
670         state_change(sk);
671 }
672
673 static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
674 {
675         int err, sndbuf_size, rcvbuf_size, my_addr_len;
676         struct sockaddr_in6 my_addr;
677         struct socket *s_listen;
678         struct net_conf *nc;
679         const char *what;
680
681         rcu_read_lock();
682         nc = rcu_dereference(tconn->net_conf);
683         if (!nc) {
684                 rcu_read_unlock();
685                 return -EIO;
686         }
687         sndbuf_size = nc->sndbuf_size;
688         rcvbuf_size = nc->rcvbuf_size;
689         rcu_read_unlock();
690
691         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
692         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
693
694         what = "sock_create_kern";
695         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
696                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
697         if (err) {
698                 s_listen = NULL;
699                 goto out;
700         }
701
702         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
703         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
704
705         what = "bind before listen";
706         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
707         if (err < 0)
708                 goto out;
709
710         ad->s_listen = s_listen;
711         write_lock_bh(&s_listen->sk->sk_callback_lock);
712         ad->original_sk_state_change = s_listen->sk->sk_state_change;
713         s_listen->sk->sk_state_change = drbd_incoming_connection;
714         s_listen->sk->sk_user_data = ad;
715         write_unlock_bh(&s_listen->sk->sk_callback_lock);
716
717         what = "listen";
718         err = s_listen->ops->listen(s_listen, 5);
719         if (err < 0)
720                 goto out;
721
722         return 0;
723 out:
724         if (s_listen)
725                 sock_release(s_listen);
726         if (err < 0) {
727                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
728                         conn_err(tconn, "%s failed, err = %d\n", what, err);
729                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
730                 }
731         }
732
733         return -EIO;
734 }
735
736 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
737 {
738         write_lock_bh(&sk->sk_callback_lock);
739         sk->sk_state_change = ad->original_sk_state_change;
740         sk->sk_user_data = NULL;
741         write_unlock_bh(&sk->sk_callback_lock);
742 }
743
744 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
745 {
746         int timeo, connect_int, err = 0;
747         struct socket *s_estab = NULL;
748         struct net_conf *nc;
749
750         rcu_read_lock();
751         nc = rcu_dereference(tconn->net_conf);
752         if (!nc) {
753                 rcu_read_unlock();
754                 return NULL;
755         }
756         connect_int = nc->connect_int;
757         rcu_read_unlock();
758
759         timeo = connect_int * HZ;
760         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
761
762         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
763         if (err <= 0)
764                 return NULL;
765
766         err = kernel_accept(ad->s_listen, &s_estab, 0);
767         if (err < 0) {
768                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769                         conn_err(tconn, "accept failed, err = %d\n", err);
770                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
771                 }
772         }
773
774         if (s_estab)
775                 unregister_state_change(s_estab->sk, ad);
776
777         return s_estab;
778 }
779
780 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
781
782 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
783                              enum drbd_packet cmd)
784 {
785         if (!conn_prepare_command(tconn, sock))
786                 return -EIO;
787         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
788 }
789
790 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
791 {
792         unsigned int header_size = drbd_header_size(tconn);
793         struct packet_info pi;
794         int err;
795
796         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
797         if (err != header_size) {
798                 if (err >= 0)
799                         err = -EIO;
800                 return err;
801         }
802         err = decode_header(tconn, tconn->data.rbuf, &pi);
803         if (err)
804                 return err;
805         return pi.cmd;
806 }
807
808 /**
809  * drbd_socket_okay() - Free the socket if its connection is not okay
810  * @sock:       pointer to the pointer to the socket.
811  */
812 static int drbd_socket_okay(struct socket **sock)
813 {
814         int rr;
815         char tb[4];
816
817         if (!*sock)
818                 return false;
819
820         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
821
822         if (rr > 0 || rr == -EAGAIN) {
823                 return true;
824         } else {
825                 sock_release(*sock);
826                 *sock = NULL;
827                 return false;
828         }
829 }
830 /* Gets called if a connection is established, or if a new minor gets created
831    in a connection */
832 int drbd_connected(struct drbd_conf *mdev)
833 {
834         int err;
835
836         atomic_set(&mdev->packet_seq, 0);
837         mdev->peer_seq = 0;
838
839         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
840                 &mdev->tconn->cstate_mutex :
841                 &mdev->own_state_mutex;
842
843         err = drbd_send_sync_param(mdev);
844         if (!err)
845                 err = drbd_send_sizes(mdev, 0, 0);
846         if (!err)
847                 err = drbd_send_uuids(mdev);
848         if (!err)
849                 err = drbd_send_current_state(mdev);
850         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
851         clear_bit(RESIZE_PENDING, &mdev->flags);
852         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
853         return err;
854 }
855
856 /*
857  * return values:
858  *   1 yes, we have a valid connection
859  *   0 oops, did not work out, please try again
860  *  -1 peer talks different language,
861  *     no point in trying again, please go standalone.
862  *  -2 We do not have a network config...
863  */
864 static int conn_connect(struct drbd_tconn *tconn)
865 {
866         struct drbd_socket sock, msock;
867         struct drbd_conf *mdev;
868         struct net_conf *nc;
869         int vnr, timeout, h, ok;
870         bool discard_my_data;
871         enum drbd_state_rv rv;
872         struct accept_wait_data ad = {
873                 .tconn = tconn,
874                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
875         };
876
877         clear_bit(DISCONNECT_SENT, &tconn->flags);
878         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
879                 return -2;
880
881         mutex_init(&sock.mutex);
882         sock.sbuf = tconn->data.sbuf;
883         sock.rbuf = tconn->data.rbuf;
884         sock.socket = NULL;
885         mutex_init(&msock.mutex);
886         msock.sbuf = tconn->meta.sbuf;
887         msock.rbuf = tconn->meta.rbuf;
888         msock.socket = NULL;
889
890         /* Assume that the peer only understands protocol 80 until we know better.  */
891         tconn->agreed_pro_version = 80;
892
893         if (prepare_listen_socket(tconn, &ad))
894                 return 0;
895
896         do {
897                 struct socket *s;
898
899                 s = drbd_try_connect(tconn);
900                 if (s) {
901                         if (!sock.socket) {
902                                 sock.socket = s;
903                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
904                         } else if (!msock.socket) {
905                                 clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
906                                 msock.socket = s;
907                                 send_first_packet(tconn, &msock, P_INITIAL_META);
908                         } else {
909                                 conn_err(tconn, "Logic error in conn_connect()\n");
910                                 goto out_release_sockets;
911                         }
912                 }
913
914                 if (sock.socket && msock.socket) {
915                         rcu_read_lock();
916                         nc = rcu_dereference(tconn->net_conf);
917                         timeout = nc->ping_timeo * HZ / 10;
918                         rcu_read_unlock();
919                         schedule_timeout_interruptible(timeout);
920                         ok = drbd_socket_okay(&sock.socket);
921                         ok = drbd_socket_okay(&msock.socket) && ok;
922                         if (ok)
923                                 break;
924                 }
925
926 retry:
927                 s = drbd_wait_for_connect(tconn, &ad);
928                 if (s) {
929                         int fp = receive_first_packet(tconn, s);
930                         drbd_socket_okay(&sock.socket);
931                         drbd_socket_okay(&msock.socket);
932                         switch (fp) {
933                         case P_INITIAL_DATA:
934                                 if (sock.socket) {
935                                         conn_warn(tconn, "initial packet S crossed\n");
936                                         sock_release(sock.socket);
937                                         sock.socket = s;
938                                         goto randomize;
939                                 }
940                                 sock.socket = s;
941                                 break;
942                         case P_INITIAL_META:
943                                 set_bit(RESOLVE_CONFLICTS, &tconn->flags);
944                                 if (msock.socket) {
945                                         conn_warn(tconn, "initial packet M crossed\n");
946                                         sock_release(msock.socket);
947                                         msock.socket = s;
948                                         goto randomize;
949                                 }
950                                 msock.socket = s;
951                                 break;
952                         default:
953                                 conn_warn(tconn, "Error receiving initial packet\n");
954                                 sock_release(s);
955 randomize:
956                                 if (random32() & 1)
957                                         goto retry;
958                         }
959                 }
960
961                 if (tconn->cstate <= C_DISCONNECTING)
962                         goto out_release_sockets;
963                 if (signal_pending(current)) {
964                         flush_signals(current);
965                         smp_rmb();
966                         if (get_t_state(&tconn->receiver) == EXITING)
967                                 goto out_release_sockets;
968                 }
969
970                 ok = drbd_socket_okay(&sock.socket);
971                 ok = drbd_socket_okay(&msock.socket) && ok;
972         } while (!ok);
973
974         if (ad.s_listen)
975                 sock_release(ad.s_listen);
976
977         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
978         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
979
980         sock.socket->sk->sk_allocation = GFP_NOIO;
981         msock.socket->sk->sk_allocation = GFP_NOIO;
982
983         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
984         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
985
986         /* NOT YET ...
987          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
988          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
989          * first set it to the P_CONNECTION_FEATURES timeout,
990          * which we set to 4x the configured ping_timeout. */
991         rcu_read_lock();
992         nc = rcu_dereference(tconn->net_conf);
993
994         sock.socket->sk->sk_sndtimeo =
995         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
996
997         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
998         timeout = nc->timeout * HZ / 10;
999         discard_my_data = nc->discard_my_data;
1000         rcu_read_unlock();
1001
1002         msock.socket->sk->sk_sndtimeo = timeout;
1003
1004         /* we don't want delays.
1005          * we use TCP_CORK where appropriate, though */
1006         drbd_tcp_nodelay(sock.socket);
1007         drbd_tcp_nodelay(msock.socket);
1008
1009         tconn->data.socket = sock.socket;
1010         tconn->meta.socket = msock.socket;
1011         tconn->last_received = jiffies;
1012
1013         h = drbd_do_features(tconn);
1014         if (h <= 0)
1015                 return h;
1016
1017         if (tconn->cram_hmac_tfm) {
1018                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
1019                 switch (drbd_do_auth(tconn)) {
1020                 case -1:
1021                         conn_err(tconn, "Authentication of peer failed\n");
1022                         return -1;
1023                 case 0:
1024                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1025                         return 0;
1026                 }
1027         }
1028
1029         tconn->data.socket->sk->sk_sndtimeo = timeout;
1030         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1031
1032         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1033                 return -1;
1034
1035         set_bit(STATE_SENT, &tconn->flags);
1036
1037         rcu_read_lock();
1038         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1039                 kref_get(&mdev->kref);
1040                 rcu_read_unlock();
1041
1042                 if (discard_my_data)
1043                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1044                 else
1045                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1046
1047                 drbd_connected(mdev);
1048                 kref_put(&mdev->kref, &drbd_minor_destroy);
1049                 rcu_read_lock();
1050         }
1051         rcu_read_unlock();
1052
1053         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1054         if (rv < SS_SUCCESS) {
1055                 clear_bit(STATE_SENT, &tconn->flags);
1056                 return 0;
1057         }
1058
1059         drbd_thread_start(&tconn->asender);
1060
1061         mutex_lock(&tconn->conf_update);
1062         /* The discard_my_data flag is a single-shot modifier to the next
1063          * connection attempt, the handshake of which is now well underway.
1064          * No need for rcu style copying of the whole struct
1065          * just to clear a single value. */
1066         tconn->net_conf->discard_my_data = 0;
1067         mutex_unlock(&tconn->conf_update);
1068
1069         return h;
1070
1071 out_release_sockets:
1072         if (ad.s_listen)
1073                 sock_release(ad.s_listen);
1074         if (sock.socket)
1075                 sock_release(sock.socket);
1076         if (msock.socket)
1077                 sock_release(msock.socket);
1078         return -1;
1079 }
1080
1081 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1082 {
1083         unsigned int header_size = drbd_header_size(tconn);
1084
1085         if (header_size == sizeof(struct p_header100) &&
1086             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1087                 struct p_header100 *h = header;
1088                 if (h->pad != 0) {
1089                         conn_err(tconn, "Header padding is not zero\n");
1090                         return -EINVAL;
1091                 }
1092                 pi->vnr = be16_to_cpu(h->volume);
1093                 pi->cmd = be16_to_cpu(h->command);
1094                 pi->size = be32_to_cpu(h->length);
1095         } else if (header_size == sizeof(struct p_header95) &&
1096                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1097                 struct p_header95 *h = header;
1098                 pi->cmd = be16_to_cpu(h->command);
1099                 pi->size = be32_to_cpu(h->length);
1100                 pi->vnr = 0;
1101         } else if (header_size == sizeof(struct p_header80) &&
1102                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1103                 struct p_header80 *h = header;
1104                 pi->cmd = be16_to_cpu(h->command);
1105                 pi->size = be16_to_cpu(h->length);
1106                 pi->vnr = 0;
1107         } else {
1108                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1109                          be32_to_cpu(*(__be32 *)header),
1110                          tconn->agreed_pro_version);
1111                 return -EINVAL;
1112         }
1113         pi->data = header + header_size;
1114         return 0;
1115 }
1116
1117 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1118 {
1119         void *buffer = tconn->data.rbuf;
1120         int err;
1121
1122         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1123         if (err)
1124                 return err;
1125
1126         err = decode_header(tconn, buffer, pi);
1127         tconn->last_received = jiffies;
1128
1129         return err;
1130 }
1131
1132 static void drbd_flush(struct drbd_tconn *tconn)
1133 {
1134         int rv;
1135         struct drbd_conf *mdev;
1136         int vnr;
1137
1138         if (tconn->write_ordering >= WO_bdev_flush) {
1139                 rcu_read_lock();
1140                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1141                         if (!get_ldev(mdev))
1142                                 continue;
1143                         kref_get(&mdev->kref);
1144                         rcu_read_unlock();
1145
1146                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1147                                         GFP_NOIO, NULL);
1148                         if (rv) {
1149                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1150                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1151                                  * don't try again for ANY return value != 0
1152                                  * if (rv == -EOPNOTSUPP) */
1153                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1154                         }
1155                         put_ldev(mdev);
1156                         kref_put(&mdev->kref, &drbd_minor_destroy);
1157
1158                         rcu_read_lock();
1159                         if (rv)
1160                                 break;
1161                 }
1162                 rcu_read_unlock();
1163         }
1164 }
1165
1166 /**
1167  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1168  * @mdev:       DRBD device.
1169  * @epoch:      Epoch object.
1170  * @ev:         Epoch event.
1171  */
1172 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1173                                                struct drbd_epoch *epoch,
1174                                                enum epoch_event ev)
1175 {
1176         int epoch_size;
1177         struct drbd_epoch *next_epoch;
1178         enum finish_epoch rv = FE_STILL_LIVE;
1179
1180         spin_lock(&tconn->epoch_lock);
1181         do {
1182                 next_epoch = NULL;
1183
1184                 epoch_size = atomic_read(&epoch->epoch_size);
1185
1186                 switch (ev & ~EV_CLEANUP) {
1187                 case EV_PUT:
1188                         atomic_dec(&epoch->active);
1189                         break;
1190                 case EV_GOT_BARRIER_NR:
1191                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1192                         break;
1193                 case EV_BECAME_LAST:
1194                         /* nothing to do*/
1195                         break;
1196                 }
1197
1198                 if (epoch_size != 0 &&
1199                     atomic_read(&epoch->active) == 0 &&
1200                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1201                         if (!(ev & EV_CLEANUP)) {
1202                                 spin_unlock(&tconn->epoch_lock);
1203                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1204                                 spin_lock(&tconn->epoch_lock);
1205                         }
1206 #if 0
1207                         /* FIXME: dec unacked on connection, once we have
1208                          * something to count pending connection packets in. */
1209                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1210                                 dec_unacked(epoch->tconn);
1211 #endif
1212
1213                         if (tconn->current_epoch != epoch) {
1214                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1215                                 list_del(&epoch->list);
1216                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1217                                 tconn->epochs--;
1218                                 kfree(epoch);
1219
1220                                 if (rv == FE_STILL_LIVE)
1221                                         rv = FE_DESTROYED;
1222                         } else {
1223                                 epoch->flags = 0;
1224                                 atomic_set(&epoch->epoch_size, 0);
1225                                 /* atomic_set(&epoch->active, 0); is already zero */
1226                                 if (rv == FE_STILL_LIVE)
1227                                         rv = FE_RECYCLED;
1228                         }
1229                 }
1230
1231                 if (!next_epoch)
1232                         break;
1233
1234                 epoch = next_epoch;
1235         } while (1);
1236
1237         spin_unlock(&tconn->epoch_lock);
1238
1239         return rv;
1240 }
1241
1242 /**
1243  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1244  * @tconn:      DRBD connection.
1245  * @wo:         Write ordering method to try.
1246  */
1247 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1248 {
1249         struct disk_conf *dc;
1250         struct drbd_conf *mdev;
1251         enum write_ordering_e pwo;
1252         int vnr;
1253         static char *write_ordering_str[] = {
1254                 [WO_none] = "none",
1255                 [WO_drain_io] = "drain",
1256                 [WO_bdev_flush] = "flush",
1257         };
1258
1259         pwo = tconn->write_ordering;
1260         wo = min(pwo, wo);
1261         rcu_read_lock();
1262         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1263                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1264                         continue;
1265                 dc = rcu_dereference(mdev->ldev->disk_conf);
1266
1267                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1268                         wo = WO_drain_io;
1269                 if (wo == WO_drain_io && !dc->disk_drain)
1270                         wo = WO_none;
1271                 put_ldev(mdev);
1272         }
1273         rcu_read_unlock();
1274         tconn->write_ordering = wo;
1275         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1276                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1277 }
1278
1279 /**
1280  * drbd_submit_peer_request()
1281  * @mdev:       DRBD device.
1282  * @peer_req:   peer request
1283  * @rw:         flag field, see bio->bi_rw
1284  *
1285  * May spread the pages to multiple bios,
1286  * depending on bio_add_page restrictions.
1287  *
1288  * Returns 0 if all bios have been submitted,
1289  * -ENOMEM if we could not allocate enough bios,
1290  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1291  *  single page to an empty bio (which should never happen and likely indicates
1292  *  that the lower level IO stack is in some way broken). This has been observed
1293  *  on certain Xen deployments.
1294  */
1295 /* TODO allocate from our own bio_set. */
1296 int drbd_submit_peer_request(struct drbd_conf *mdev,
1297                              struct drbd_peer_request *peer_req,
1298                              const unsigned rw, const int fault_type)
1299 {
1300         struct bio *bios = NULL;
1301         struct bio *bio;
1302         struct page *page = peer_req->pages;
1303         sector_t sector = peer_req->i.sector;
1304         unsigned ds = peer_req->i.size;
1305         unsigned n_bios = 0;
1306         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1307         int err = -ENOMEM;
1308
1309         /* In most cases, we will only need one bio.  But in case the lower
1310          * level restrictions happen to be different at this offset on this
1311          * side than those of the sending peer, we may need to submit the
1312          * request in more than one bio.
1313          *
1314          * Plain bio_alloc is good enough here, this is no DRBD internally
1315          * generated bio, but a bio allocated on behalf of the peer.
1316          */
1317 next_bio:
1318         bio = bio_alloc(GFP_NOIO, nr_pages);
1319         if (!bio) {
1320                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1321                 goto fail;
1322         }
1323         /* > peer_req->i.sector, unless this is the first bio */
1324         bio->bi_sector = sector;
1325         bio->bi_bdev = mdev->ldev->backing_bdev;
1326         bio->bi_rw = rw;
1327         bio->bi_private = peer_req;
1328         bio->bi_end_io = drbd_peer_request_endio;
1329
1330         bio->bi_next = bios;
1331         bios = bio;
1332         ++n_bios;
1333
1334         page_chain_for_each(page) {
1335                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1336                 if (!bio_add_page(bio, page, len, 0)) {
1337                         /* A single page must always be possible!
1338                          * But in case it fails anyways,
1339                          * we deal with it, and complain (below). */
1340                         if (bio->bi_vcnt == 0) {
1341                                 dev_err(DEV,
1342                                         "bio_add_page failed for len=%u, "
1343                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1344                                         len, (unsigned long long)bio->bi_sector);
1345                                 err = -ENOSPC;
1346                                 goto fail;
1347                         }
1348                         goto next_bio;
1349                 }
1350                 ds -= len;
1351                 sector += len >> 9;
1352                 --nr_pages;
1353         }
1354         D_ASSERT(page == NULL);
1355         D_ASSERT(ds == 0);
1356
1357         atomic_set(&peer_req->pending_bios, n_bios);
1358         do {
1359                 bio = bios;
1360                 bios = bios->bi_next;
1361                 bio->bi_next = NULL;
1362
1363                 drbd_generic_make_request(mdev, fault_type, bio);
1364         } while (bios);
1365         return 0;
1366
1367 fail:
1368         while (bios) {
1369                 bio = bios;
1370                 bios = bios->bi_next;
1371                 bio_put(bio);
1372         }
1373         return err;
1374 }
1375
1376 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1377                                              struct drbd_peer_request *peer_req)
1378 {
1379         struct drbd_interval *i = &peer_req->i;
1380
1381         drbd_remove_interval(&mdev->write_requests, i);
1382         drbd_clear_interval(i);
1383
1384         /* Wake up any processes waiting for this peer request to complete.  */
1385         if (i->waiting)
1386                 wake_up(&mdev->misc_wait);
1387 }
1388
1389 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1390 {
1391         struct drbd_conf *mdev;
1392         int vnr;
1393
1394         rcu_read_lock();
1395         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1396                 kref_get(&mdev->kref);
1397                 rcu_read_unlock();
1398                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1399                 kref_put(&mdev->kref, &drbd_minor_destroy);
1400                 rcu_read_lock();
1401         }
1402         rcu_read_unlock();
1403 }
1404
1405 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1406 {
1407         int rv;
1408         struct p_barrier *p = pi->data;
1409         struct drbd_epoch *epoch;
1410
1411         /* FIXME these are unacked on connection,
1412          * not a specific (peer)device.
1413          */
1414         tconn->current_epoch->barrier_nr = p->barrier;
1415         tconn->current_epoch->tconn = tconn;
1416         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1417
1418         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1419          * the activity log, which means it would not be resynced in case the
1420          * R_PRIMARY crashes now.
1421          * Therefore we must send the barrier_ack after the barrier request was
1422          * completed. */
1423         switch (tconn->write_ordering) {
1424         case WO_none:
1425                 if (rv == FE_RECYCLED)
1426                         return 0;
1427
1428                 /* receiver context, in the writeout path of the other node.
1429                  * avoid potential distributed deadlock */
1430                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1431                 if (epoch)
1432                         break;
1433                 else
1434                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1435                         /* Fall through */
1436
1437         case WO_bdev_flush:
1438         case WO_drain_io:
1439                 conn_wait_active_ee_empty(tconn);
1440                 drbd_flush(tconn);
1441
1442                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1443                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1444                         if (epoch)
1445                                 break;
1446                 }
1447
1448                 return 0;
1449         default:
1450                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1451                 return -EIO;
1452         }
1453
1454         epoch->flags = 0;
1455         atomic_set(&epoch->epoch_size, 0);
1456         atomic_set(&epoch->active, 0);
1457
1458         spin_lock(&tconn->epoch_lock);
1459         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1460                 list_add(&epoch->list, &tconn->current_epoch->list);
1461                 tconn->current_epoch = epoch;
1462                 tconn->epochs++;
1463         } else {
1464                 /* The current_epoch got recycled while we allocated this one... */
1465                 kfree(epoch);
1466         }
1467         spin_unlock(&tconn->epoch_lock);
1468
1469         return 0;
1470 }
1471
1472 /* used from receive_RSDataReply (recv_resync_read)
1473  * and from receive_Data */
1474 static struct drbd_peer_request *
1475 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1476               int data_size) __must_hold(local)
1477 {
1478         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1479         struct drbd_peer_request *peer_req;
1480         struct page *page;
1481         int dgs, ds, err;
1482         void *dig_in = mdev->tconn->int_dig_in;
1483         void *dig_vv = mdev->tconn->int_dig_vv;
1484         unsigned long *data;
1485
1486         dgs = 0;
1487         if (mdev->tconn->peer_integrity_tfm) {
1488                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1489                 /*
1490                  * FIXME: Receive the incoming digest into the receive buffer
1491                  *        here, together with its struct p_data?
1492                  */
1493                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1494                 if (err)
1495                         return NULL;
1496                 data_size -= dgs;
1497         }
1498
1499         if (!expect(IS_ALIGNED(data_size, 512)))
1500                 return NULL;
1501         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1502                 return NULL;
1503
1504         /* even though we trust out peer,
1505          * we sometimes have to double check. */
1506         if (sector + (data_size>>9) > capacity) {
1507                 dev_err(DEV, "request from peer beyond end of local disk: "
1508                         "capacity: %llus < sector: %llus + size: %u\n",
1509                         (unsigned long long)capacity,
1510                         (unsigned long long)sector, data_size);
1511                 return NULL;
1512         }
1513
1514         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1515          * "criss-cross" setup, that might cause write-out on some other DRBD,
1516          * which in turn might block on the other node at this very place.  */
1517         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1518         if (!peer_req)
1519                 return NULL;
1520
1521         if (!data_size)
1522                 return peer_req;
1523
1524         ds = data_size;
1525         page = peer_req->pages;
1526         page_chain_for_each(page) {
1527                 unsigned len = min_t(int, ds, PAGE_SIZE);
1528                 data = kmap(page);
1529                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1530                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1531                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1532                         data[0] = data[0] ^ (unsigned long)-1;
1533                 }
1534                 kunmap(page);
1535                 if (err) {
1536                         drbd_free_peer_req(mdev, peer_req);
1537                         return NULL;
1538                 }
1539                 ds -= len;
1540         }
1541
1542         if (dgs) {
1543                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1544                 if (memcmp(dig_in, dig_vv, dgs)) {
1545                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1546                                 (unsigned long long)sector, data_size);
1547                         drbd_free_peer_req(mdev, peer_req);
1548                         return NULL;
1549                 }
1550         }
1551         mdev->recv_cnt += data_size>>9;
1552         return peer_req;
1553 }
1554
1555 /* drbd_drain_block() just takes a data block
1556  * out of the socket input buffer, and discards it.
1557  */
1558 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1559 {
1560         struct page *page;
1561         int err = 0;
1562         void *data;
1563
1564         if (!data_size)
1565                 return 0;
1566
1567         page = drbd_alloc_pages(mdev, 1, 1);
1568
1569         data = kmap(page);
1570         while (data_size) {
1571                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1572
1573                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1574                 if (err)
1575                         break;
1576                 data_size -= len;
1577         }
1578         kunmap(page);
1579         drbd_free_pages(mdev, page, 0);
1580         return err;
1581 }
1582
1583 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1584                            sector_t sector, int data_size)
1585 {
1586         struct bio_vec *bvec;
1587         struct bio *bio;
1588         int dgs, err, i, expect;
1589         void *dig_in = mdev->tconn->int_dig_in;
1590         void *dig_vv = mdev->tconn->int_dig_vv;
1591
1592         dgs = 0;
1593         if (mdev->tconn->peer_integrity_tfm) {
1594                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1595                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1596                 if (err)
1597                         return err;
1598                 data_size -= dgs;
1599         }
1600
1601         /* optimistically update recv_cnt.  if receiving fails below,
1602          * we disconnect anyways, and counters will be reset. */
1603         mdev->recv_cnt += data_size>>9;
1604
1605         bio = req->master_bio;
1606         D_ASSERT(sector == bio->bi_sector);
1607
1608         bio_for_each_segment(bvec, bio, i) {
1609                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1610                 expect = min_t(int, data_size, bvec->bv_len);
1611                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1612                 kunmap(bvec->bv_page);
1613                 if (err)
1614                         return err;
1615                 data_size -= expect;
1616         }
1617
1618         if (dgs) {
1619                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1620                 if (memcmp(dig_in, dig_vv, dgs)) {
1621                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1622                         return -EINVAL;
1623                 }
1624         }
1625
1626         D_ASSERT(data_size == 0);
1627         return 0;
1628 }
1629
1630 /*
1631  * e_end_resync_block() is called in asender context via
1632  * drbd_finish_peer_reqs().
1633  */
1634 static int e_end_resync_block(struct drbd_work *w, int unused)
1635 {
1636         struct drbd_peer_request *peer_req =
1637                 container_of(w, struct drbd_peer_request, w);
1638         struct drbd_conf *mdev = w->mdev;
1639         sector_t sector = peer_req->i.sector;
1640         int err;
1641
1642         D_ASSERT(drbd_interval_empty(&peer_req->i));
1643
1644         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1645                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1646                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1647         } else {
1648                 /* Record failure to sync */
1649                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1650
1651                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1652         }
1653         dec_unacked(mdev);
1654
1655         return err;
1656 }
1657
1658 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1659 {
1660         struct drbd_peer_request *peer_req;
1661
1662         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1663         if (!peer_req)
1664                 goto fail;
1665
1666         dec_rs_pending(mdev);
1667
1668         inc_unacked(mdev);
1669         /* corresponding dec_unacked() in e_end_resync_block()
1670          * respective _drbd_clear_done_ee */
1671
1672         peer_req->w.cb = e_end_resync_block;
1673
1674         spin_lock_irq(&mdev->tconn->req_lock);
1675         list_add(&peer_req->w.list, &mdev->sync_ee);
1676         spin_unlock_irq(&mdev->tconn->req_lock);
1677
1678         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1679         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1680                 return 0;
1681
1682         /* don't care for the reason here */
1683         dev_err(DEV, "submit failed, triggering re-connect\n");
1684         spin_lock_irq(&mdev->tconn->req_lock);
1685         list_del(&peer_req->w.list);
1686         spin_unlock_irq(&mdev->tconn->req_lock);
1687
1688         drbd_free_peer_req(mdev, peer_req);
1689 fail:
1690         put_ldev(mdev);
1691         return -EIO;
1692 }
1693
1694 static struct drbd_request *
1695 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1696              sector_t sector, bool missing_ok, const char *func)
1697 {
1698         struct drbd_request *req;
1699
1700         /* Request object according to our peer */
1701         req = (struct drbd_request *)(unsigned long)id;
1702         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1703                 return req;
1704         if (!missing_ok) {
1705                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1706                         (unsigned long)id, (unsigned long long)sector);
1707         }
1708         return NULL;
1709 }
1710
1711 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1712 {
1713         struct drbd_conf *mdev;
1714         struct drbd_request *req;
1715         sector_t sector;
1716         int err;
1717         struct p_data *p = pi->data;
1718
1719         mdev = vnr_to_mdev(tconn, pi->vnr);
1720         if (!mdev)
1721                 return -EIO;
1722
1723         sector = be64_to_cpu(p->sector);
1724
1725         spin_lock_irq(&mdev->tconn->req_lock);
1726         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1727         spin_unlock_irq(&mdev->tconn->req_lock);
1728         if (unlikely(!req))
1729                 return -EIO;
1730
1731         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1732          * special casing it there for the various failure cases.
1733          * still no race with drbd_fail_pending_reads */
1734         err = recv_dless_read(mdev, req, sector, pi->size);
1735         if (!err)
1736                 req_mod(req, DATA_RECEIVED);
1737         /* else: nothing. handled from drbd_disconnect...
1738          * I don't think we may complete this just yet
1739          * in case we are "on-disconnect: freeze" */
1740
1741         return err;
1742 }
1743
1744 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1745 {
1746         struct drbd_conf *mdev;
1747         sector_t sector;
1748         int err;
1749         struct p_data *p = pi->data;
1750
1751         mdev = vnr_to_mdev(tconn, pi->vnr);
1752         if (!mdev)
1753                 return -EIO;
1754
1755         sector = be64_to_cpu(p->sector);
1756         D_ASSERT(p->block_id == ID_SYNCER);
1757
1758         if (get_ldev(mdev)) {
1759                 /* data is submitted to disk within recv_resync_read.
1760                  * corresponding put_ldev done below on error,
1761                  * or in drbd_peer_request_endio. */
1762                 err = recv_resync_read(mdev, sector, pi->size);
1763         } else {
1764                 if (__ratelimit(&drbd_ratelimit_state))
1765                         dev_err(DEV, "Can not write resync data to local disk.\n");
1766
1767                 err = drbd_drain_block(mdev, pi->size);
1768
1769                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1770         }
1771
1772         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1773
1774         return err;
1775 }
1776
1777 static void restart_conflicting_writes(struct drbd_conf *mdev,
1778                                        sector_t sector, int size)
1779 {
1780         struct drbd_interval *i;
1781         struct drbd_request *req;
1782
1783         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1784                 if (!i->local)
1785                         continue;
1786                 req = container_of(i, struct drbd_request, i);
1787                 if (req->rq_state & RQ_LOCAL_PENDING ||
1788                     !(req->rq_state & RQ_POSTPONED))
1789                         continue;
1790                 /* as it is RQ_POSTPONED, this will cause it to
1791                  * be queued on the retry workqueue. */
1792                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1793         }
1794 }
1795
1796 /*
1797  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1798  */
1799 static int e_end_block(struct drbd_work *w, int cancel)
1800 {
1801         struct drbd_peer_request *peer_req =
1802                 container_of(w, struct drbd_peer_request, w);
1803         struct drbd_conf *mdev = w->mdev;
1804         sector_t sector = peer_req->i.sector;
1805         int err = 0, pcmd;
1806
1807         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1808                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1809                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1810                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1811                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1812                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1813                         err = drbd_send_ack(mdev, pcmd, peer_req);
1814                         if (pcmd == P_RS_WRITE_ACK)
1815                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1816                 } else {
1817                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1818                         /* we expect it to be marked out of sync anyways...
1819                          * maybe assert this?  */
1820                 }
1821                 dec_unacked(mdev);
1822         }
1823         /* we delete from the conflict detection hash _after_ we sent out the
1824          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1825         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1826                 spin_lock_irq(&mdev->tconn->req_lock);
1827                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1828                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1829                 if (peer_req->flags & EE_RESTART_REQUESTS)
1830                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1831                 spin_unlock_irq(&mdev->tconn->req_lock);
1832         } else
1833                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1834
1835         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1836
1837         return err;
1838 }
1839
1840 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1841 {
1842         struct drbd_conf *mdev = w->mdev;
1843         struct drbd_peer_request *peer_req =
1844                 container_of(w, struct drbd_peer_request, w);
1845         int err;
1846
1847         err = drbd_send_ack(mdev, ack, peer_req);
1848         dec_unacked(mdev);
1849
1850         return err;
1851 }
1852
1853 static int e_send_superseded(struct drbd_work *w, int unused)
1854 {
1855         return e_send_ack(w, P_SUPERSEDED);
1856 }
1857
1858 static int e_send_retry_write(struct drbd_work *w, int unused)
1859 {
1860         struct drbd_tconn *tconn = w->mdev->tconn;
1861
1862         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1863                              P_RETRY_WRITE : P_SUPERSEDED);
1864 }
1865
1866 static bool seq_greater(u32 a, u32 b)
1867 {
1868         /*
1869          * We assume 32-bit wrap-around here.
1870          * For 24-bit wrap-around, we would have to shift:
1871          *  a <<= 8; b <<= 8;
1872          */
1873         return (s32)a - (s32)b > 0;
1874 }
1875
1876 static u32 seq_max(u32 a, u32 b)
1877 {
1878         return seq_greater(a, b) ? a : b;
1879 }
1880
1881 static bool need_peer_seq(struct drbd_conf *mdev)
1882 {
1883         struct drbd_tconn *tconn = mdev->tconn;
1884         int tp;
1885
1886         /*
1887          * We only need to keep track of the last packet_seq number of our peer
1888          * if we are in dual-primary mode and we have the resolve-conflicts flag set; see
1889          * handle_write_conflicts().
1890          */
1891
1892         rcu_read_lock();
1893         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1894         rcu_read_unlock();
1895
1896         return tp && test_bit(RESOLVE_CONFLICTS, &tconn->flags);
1897 }
1898
1899 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1900 {
1901         unsigned int newest_peer_seq;
1902
1903         if (need_peer_seq(mdev)) {
1904                 spin_lock(&mdev->peer_seq_lock);
1905                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1906                 mdev->peer_seq = newest_peer_seq;
1907                 spin_unlock(&mdev->peer_seq_lock);
1908                 /* wake up only if we actually changed mdev->peer_seq */
1909                 if (peer_seq == newest_peer_seq)
1910                         wake_up(&mdev->seq_wait);
1911         }
1912 }
1913
1914 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1915 {
1916         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1917 }
1918
1919 /* maybe change sync_ee into interval trees as well? */
1920 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1921 {
1922         struct drbd_peer_request *rs_req;
1923         bool rv = 0;
1924
1925         spin_lock_irq(&mdev->tconn->req_lock);
1926         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1927                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1928                              rs_req->i.sector, rs_req->i.size)) {
1929                         rv = 1;
1930                         break;
1931                 }
1932         }
1933         spin_unlock_irq(&mdev->tconn->req_lock);
1934
1935         return rv;
1936 }
1937
1938 /* Called from receive_Data.
1939  * Synchronize packets on sock with packets on msock.
1940  *
1941  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1942  * packet traveling on msock, they are still processed in the order they have
1943  * been sent.
1944  *
1945  * Note: we don't care for Ack packets overtaking P_DATA packets.
1946  *
1947  * In case packet_seq is larger than mdev->peer_seq number, there are
1948  * outstanding packets on the msock. We wait for them to arrive.
1949  * In case we are the logically next packet, we update mdev->peer_seq
1950  * ourselves. Correctly handles 32bit wrap around.
1951  *
1952  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1953  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1954  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1955  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1956  *
1957  * returns 0 if we may process the packet,
1958  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1959 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1960 {
1961         DEFINE_WAIT(wait);
1962         long timeout;
1963         int ret;
1964
1965         if (!need_peer_seq(mdev))
1966                 return 0;
1967
1968         spin_lock(&mdev->peer_seq_lock);
1969         for (;;) {
1970                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1971                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1972                         ret = 0;
1973                         break;
1974                 }
1975                 if (signal_pending(current)) {
1976                         ret = -ERESTARTSYS;
1977                         break;
1978                 }
1979                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1980                 spin_unlock(&mdev->peer_seq_lock);
1981                 rcu_read_lock();
1982                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1983                 rcu_read_unlock();
1984                 timeout = schedule_timeout(timeout);
1985                 spin_lock(&mdev->peer_seq_lock);
1986                 if (!timeout) {
1987                         ret = -ETIMEDOUT;
1988                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1989                         break;
1990                 }
1991         }
1992         spin_unlock(&mdev->peer_seq_lock);
1993         finish_wait(&mdev->seq_wait, &wait);
1994         return ret;
1995 }
1996
1997 /* see also bio_flags_to_wire()
1998  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1999  * flags and back. We may replicate to other kernel versions. */
2000 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2001 {
2002         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2003                 (dpf & DP_FUA ? REQ_FUA : 0) |
2004                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2005                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2006 }
2007
2008 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2009                                     unsigned int size)
2010 {
2011         struct drbd_interval *i;
2012
2013     repeat:
2014         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2015                 struct drbd_request *req;
2016                 struct bio_and_error m;
2017
2018                 if (!i->local)
2019                         continue;
2020                 req = container_of(i, struct drbd_request, i);
2021                 if (!(req->rq_state & RQ_POSTPONED))
2022                         continue;
2023                 req->rq_state &= ~RQ_POSTPONED;
2024                 __req_mod(req, NEG_ACKED, &m);
2025                 spin_unlock_irq(&mdev->tconn->req_lock);
2026                 if (m.bio)
2027                         complete_master_bio(mdev, &m);
2028                 spin_lock_irq(&mdev->tconn->req_lock);
2029                 goto repeat;
2030         }
2031 }
2032
2033 static int handle_write_conflicts(struct drbd_conf *mdev,
2034                                   struct drbd_peer_request *peer_req)
2035 {
2036         struct drbd_tconn *tconn = mdev->tconn;
2037         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
2038         sector_t sector = peer_req->i.sector;
2039         const unsigned int size = peer_req->i.size;
2040         struct drbd_interval *i;
2041         bool equal;
2042         int err;
2043
2044         /*
2045          * Inserting the peer request into the write_requests tree will prevent
2046          * new conflicting local requests from being added.
2047          */
2048         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2049
2050     repeat:
2051         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2052                 if (i == &peer_req->i)
2053                         continue;
2054
2055                 if (!i->local) {
2056                         /*
2057                          * Our peer has sent a conflicting remote request; this
2058                          * should not happen in a two-node setup.  Wait for the
2059                          * earlier peer request to complete.
2060                          */
2061                         err = drbd_wait_misc(mdev, i);
2062                         if (err)
2063                                 goto out;
2064                         goto repeat;
2065                 }
2066
2067                 equal = i->sector == sector && i->size == size;
2068                 if (resolve_conflicts) {
2069                         /*
2070                          * If the peer request is fully contained within the
2071                          * overlapping request, it can be considered overwritten
2072                          * and thus superseded; otherwise, it will be retried
2073                          * once all overlapping requests have completed.
2074                          */
2075                         bool superseded = i->sector <= sector && i->sector +
2076                                        (i->size >> 9) >= sector + (size >> 9);
2077
2078                         if (!equal)
2079                                 dev_alert(DEV, "Concurrent writes detected: "
2080                                                "local=%llus +%u, remote=%llus +%u, "
2081                                                "assuming %s came first\n",
2082                                           (unsigned long long)i->sector, i->size,
2083                                           (unsigned long long)sector, size,
2084                                           superseded ? "local" : "remote");
2085
2086                         inc_unacked(mdev);
2087                         peer_req->w.cb = superseded ? e_send_superseded :
2088                                                    e_send_retry_write;
2089                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2090                         wake_asender(mdev->tconn);
2091
2092                         err = -ENOENT;
2093                         goto out;
2094                 } else {
2095                         struct drbd_request *req =
2096                                 container_of(i, struct drbd_request, i);
2097
2098                         if (!equal)
2099                                 dev_alert(DEV, "Concurrent writes detected: "
2100                                                "local=%llus +%u, remote=%llus +%u\n",
2101                                           (unsigned long long)i->sector, i->size,
2102                                           (unsigned long long)sector, size);
2103
2104                         if (req->rq_state & RQ_LOCAL_PENDING ||
2105                             !(req->rq_state & RQ_POSTPONED)) {
2106                                 /*
2107                                  * Wait for the node with the discard flag to
2108                                  * decide if this request has been superseded
2109                                  * or needs to be retried.
2110                                  * Requests that have been superseded will
2111                                  * disappear from the write_requests tree.
2112                                  *
2113                                  * In addition, wait for the conflicting
2114                                  * request to finish locally before submitting
2115                                  * the conflicting peer request.
2116                                  */
2117                                 err = drbd_wait_misc(mdev, &req->i);
2118                                 if (err) {
2119                                         _conn_request_state(mdev->tconn,
2120                                                             NS(conn, C_TIMEOUT),
2121                                                             CS_HARD);
2122                                         fail_postponed_requests(mdev, sector, size);
2123                                         goto out;
2124                                 }
2125                                 goto repeat;
2126                         }
2127                         /*
2128                          * Remember to restart the conflicting requests after
2129                          * the new peer request has completed.
2130                          */
2131                         peer_req->flags |= EE_RESTART_REQUESTS;
2132                 }
2133         }
2134         err = 0;
2135
2136     out:
2137         if (err)
2138                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2139         return err;
2140 }
2141
2142 /* mirrored write */
2143 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2144 {
2145         struct drbd_conf *mdev;
2146         sector_t sector;
2147         struct drbd_peer_request *peer_req;
2148         struct p_data *p = pi->data;
2149         u32 peer_seq = be32_to_cpu(p->seq_num);
2150         int rw = WRITE;
2151         u32 dp_flags;
2152         int err, tp;
2153
2154         mdev = vnr_to_mdev(tconn, pi->vnr);
2155         if (!mdev)
2156                 return -EIO;
2157
2158         if (!get_ldev(mdev)) {
2159                 int err2;
2160
2161                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2162                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2163                 atomic_inc(&tconn->current_epoch->epoch_size);
2164                 err2 = drbd_drain_block(mdev, pi->size);
2165                 if (!err)
2166                         err = err2;
2167                 return err;
2168         }
2169
2170         /*
2171          * Corresponding put_ldev done either below (on various errors), or in
2172          * drbd_peer_request_endio, if we successfully submit the data at the
2173          * end of this function.
2174          */
2175
2176         sector = be64_to_cpu(p->sector);
2177         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2178         if (!peer_req) {
2179                 put_ldev(mdev);
2180                 return -EIO;
2181         }
2182
2183         peer_req->w.cb = e_end_block;
2184
2185         dp_flags = be32_to_cpu(p->dp_flags);
2186         rw |= wire_flags_to_bio(mdev, dp_flags);
2187         if (peer_req->pages == NULL) {
2188                 D_ASSERT(peer_req->i.size == 0);
2189                 D_ASSERT(dp_flags & DP_FLUSH);
2190         }
2191
2192         if (dp_flags & DP_MAY_SET_IN_SYNC)
2193                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2194
2195         spin_lock(&tconn->epoch_lock);
2196         peer_req->epoch = tconn->current_epoch;
2197         atomic_inc(&peer_req->epoch->epoch_size);
2198         atomic_inc(&peer_req->epoch->active);
2199         spin_unlock(&tconn->epoch_lock);
2200
2201         rcu_read_lock();
2202         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2203         rcu_read_unlock();
2204         if (tp) {
2205                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2206                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2207                 if (err)
2208                         goto out_interrupted;
2209                 spin_lock_irq(&mdev->tconn->req_lock);
2210                 err = handle_write_conflicts(mdev, peer_req);
2211                 if (err) {
2212                         spin_unlock_irq(&mdev->tconn->req_lock);
2213                         if (err == -ENOENT) {
2214                                 put_ldev(mdev);
2215                                 return 0;
2216                         }
2217                         goto out_interrupted;
2218                 }
2219         } else
2220                 spin_lock_irq(&mdev->tconn->req_lock);
2221         list_add(&peer_req->w.list, &mdev->active_ee);
2222         spin_unlock_irq(&mdev->tconn->req_lock);
2223
2224         if (mdev->state.conn == C_SYNC_TARGET)
2225                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2226
2227         if (mdev->tconn->agreed_pro_version < 100) {
2228                 rcu_read_lock();
2229                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2230                 case DRBD_PROT_C:
2231                         dp_flags |= DP_SEND_WRITE_ACK;
2232                         break;
2233                 case DRBD_PROT_B:
2234                         dp_flags |= DP_SEND_RECEIVE_ACK;
2235                         break;
2236                 }
2237                 rcu_read_unlock();
2238         }
2239
2240         if (dp_flags & DP_SEND_WRITE_ACK) {
2241                 peer_req->flags |= EE_SEND_WRITE_ACK;
2242                 inc_unacked(mdev);
2243                 /* corresponding dec_unacked() in e_end_block()
2244                  * respective _drbd_clear_done_ee */
2245         }
2246
2247         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2248                 /* I really don't like it that the receiver thread
2249                  * sends on the msock, but anyways */
2250                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2251         }
2252
2253         if (mdev->state.pdsk < D_INCONSISTENT) {
2254                 /* In case we have the only disk of the cluster, */
2255                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2256                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2257                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2258                 drbd_al_begin_io(mdev, &peer_req->i);
2259         }
2260
2261         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2262         if (!err)
2263                 return 0;
2264
2265         /* don't care for the reason here */
2266         dev_err(DEV, "submit failed, triggering re-connect\n");
2267         spin_lock_irq(&mdev->tconn->req_lock);
2268         list_del(&peer_req->w.list);
2269         drbd_remove_epoch_entry_interval(mdev, peer_req);
2270         spin_unlock_irq(&mdev->tconn->req_lock);
2271         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2272                 drbd_al_complete_io(mdev, &peer_req->i);
2273
2274 out_interrupted:
2275         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2276         put_ldev(mdev);
2277         drbd_free_peer_req(mdev, peer_req);
2278         return err;
2279 }
2280
2281 /* We may throttle resync, if the lower device seems to be busy,
2282  * and current sync rate is above c_min_rate.
2283  *
2284  * To decide whether or not the lower device is busy, we use a scheme similar
2285  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2286  * (more than 64 sectors) of activity we cannot account for with our own resync
2287  * activity, it obviously is "busy".
2288  *
2289  * The current sync rate used here uses only the most recent two step marks,
2290  * to have a short time average so we can react faster.
2291  */
2292 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2293 {
2294         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2295         unsigned long db, dt, dbdt;
2296         struct lc_element *tmp;
2297         int curr_events;
2298         int throttle = 0;
2299         unsigned int c_min_rate;
2300
2301         rcu_read_lock();
2302         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2303         rcu_read_unlock();
2304
2305         /* feature disabled? */
2306         if (c_min_rate == 0)
2307                 return 0;
2308
2309         spin_lock_irq(&mdev->al_lock);
2310         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2311         if (tmp) {
2312                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2313                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2314                         spin_unlock_irq(&mdev->al_lock);
2315                         return 0;
2316                 }
2317                 /* Do not slow down if app IO is already waiting for this extent */
2318         }
2319         spin_unlock_irq(&mdev->al_lock);
2320
2321         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2322                       (int)part_stat_read(&disk->part0, sectors[1]) -
2323                         atomic_read(&mdev->rs_sect_ev);
2324
2325         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2326                 unsigned long rs_left;
2327                 int i;
2328
2329                 mdev->rs_last_events = curr_events;
2330
2331                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2332                  * approx. */
2333                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2334
2335                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2336                         rs_left = mdev->ov_left;
2337                 else
2338                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2339
2340                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2341                 if (!dt)
2342                         dt++;
2343                 db = mdev->rs_mark_left[i] - rs_left;
2344                 dbdt = Bit2KB(db/dt);
2345
2346                 if (dbdt > c_min_rate)
2347                         throttle = 1;
2348         }
2349         return throttle;
2350 }
2351
2352
2353 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2354 {
2355         struct drbd_conf *mdev;
2356         sector_t sector;
2357         sector_t capacity;
2358         struct drbd_peer_request *peer_req;
2359         struct digest_info *di = NULL;
2360         int size, verb;
2361         unsigned int fault_type;
2362         struct p_block_req *p = pi->data;
2363
2364         mdev = vnr_to_mdev(tconn, pi->vnr);
2365         if (!mdev)
2366                 return -EIO;
2367         capacity = drbd_get_capacity(mdev->this_bdev);
2368
2369         sector = be64_to_cpu(p->sector);
2370         size   = be32_to_cpu(p->blksize);
2371
2372         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2373                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2374                                 (unsigned long long)sector, size);
2375                 return -EINVAL;
2376         }
2377         if (sector + (size>>9) > capacity) {
2378                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2379                                 (unsigned long long)sector, size);
2380                 return -EINVAL;
2381         }
2382
2383         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2384                 verb = 1;
2385                 switch (pi->cmd) {
2386                 case P_DATA_REQUEST:
2387                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2388                         break;
2389                 case P_RS_DATA_REQUEST:
2390                 case P_CSUM_RS_REQUEST:
2391                 case P_OV_REQUEST:
2392                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2393                         break;
2394                 case P_OV_REPLY:
2395                         verb = 0;
2396                         dec_rs_pending(mdev);
2397                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2398                         break;
2399                 default:
2400                         BUG();
2401                 }
2402                 if (verb && __ratelimit(&drbd_ratelimit_state))
2403                         dev_err(DEV, "Can not satisfy peer's read request, "
2404                             "no local data.\n");
2405
2406                 /* drain possibly payload */
2407                 return drbd_drain_block(mdev, pi->size);
2408         }
2409
2410         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2411          * "criss-cross" setup, that might cause write-out on some other DRBD,
2412          * which in turn might block on the other node at this very place.  */
2413         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2414         if (!peer_req) {
2415                 put_ldev(mdev);
2416                 return -ENOMEM;
2417         }
2418
2419         switch (pi->cmd) {
2420         case P_DATA_REQUEST:
2421                 peer_req->w.cb = w_e_end_data_req;
2422                 fault_type = DRBD_FAULT_DT_RD;
2423                 /* application IO, don't drbd_rs_begin_io */
2424                 goto submit;
2425
2426         case P_RS_DATA_REQUEST:
2427                 peer_req->w.cb = w_e_end_rsdata_req;
2428                 fault_type = DRBD_FAULT_RS_RD;
2429                 /* used in the sector offset progress display */
2430                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2431                 break;
2432
2433         case P_OV_REPLY:
2434         case P_CSUM_RS_REQUEST:
2435                 fault_type = DRBD_FAULT_RS_RD;
2436                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2437                 if (!di)
2438                         goto out_free_e;
2439
2440                 di->digest_size = pi->size;
2441                 di->digest = (((char *)di)+sizeof(struct digest_info));
2442
2443                 peer_req->digest = di;
2444                 peer_req->flags |= EE_HAS_DIGEST;
2445
2446                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2447                         goto out_free_e;
2448
2449                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2450                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2451                         peer_req->w.cb = w_e_end_csum_rs_req;
2452                         /* used in the sector offset progress display */
2453                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2454                 } else if (pi->cmd == P_OV_REPLY) {
2455                         /* track progress, we may need to throttle */
2456                         atomic_add(size >> 9, &mdev->rs_sect_in);
2457                         peer_req->w.cb = w_e_end_ov_reply;
2458                         dec_rs_pending(mdev);
2459                         /* drbd_rs_begin_io done when we sent this request,
2460                          * but accounting still needs to be done. */
2461                         goto submit_for_resync;
2462                 }
2463                 break;
2464
2465         case P_OV_REQUEST:
2466                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2467                     mdev->tconn->agreed_pro_version >= 90) {
2468                         unsigned long now = jiffies;
2469                         int i;
2470                         mdev->ov_start_sector = sector;
2471                         mdev->ov_position = sector;
2472                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2473                         mdev->rs_total = mdev->ov_left;
2474                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2475                                 mdev->rs_mark_left[i] = mdev->ov_left;
2476                                 mdev->rs_mark_time[i] = now;
2477                         }
2478                         dev_info(DEV, "Online Verify start sector: %llu\n",
2479                                         (unsigned long long)sector);
2480                 }
2481                 peer_req->w.cb = w_e_end_ov_req;
2482                 fault_type = DRBD_FAULT_RS_RD;
2483                 break;
2484
2485         default:
2486                 BUG();
2487         }
2488
2489         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2490          * wrt the receiver, but it is not as straightforward as it may seem.
2491          * Various places in the resync start and stop logic assume resync
2492          * requests are processed in order, requeuing this on the worker thread
2493          * introduces a bunch of new code for synchronization between threads.
2494          *
2495          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2496          * "forever", throttling after drbd_rs_begin_io will lock that extent
2497          * for application writes for the same time.  For now, just throttle
2498          * here, where the rest of the code expects the receiver to sleep for
2499          * a while, anyways.
2500          */
2501
2502         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2503          * this defers syncer requests for some time, before letting at least
2504          * on request through.  The resync controller on the receiving side
2505          * will adapt to the incoming rate accordingly.
2506          *
2507          * We cannot throttle here if remote is Primary/SyncTarget:
2508          * we would also throttle its application reads.
2509          * In that case, throttling is done on the SyncTarget only.
2510          */
2511         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2512                 schedule_timeout_uninterruptible(HZ/10);
2513         if (drbd_rs_begin_io(mdev, sector))
2514                 goto out_free_e;
2515
2516 submit_for_resync:
2517         atomic_add(size >> 9, &mdev->rs_sect_ev);
2518
2519 submit:
2520         inc_unacked(mdev);
2521         spin_lock_irq(&mdev->tconn->req_lock);
2522         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2523         spin_unlock_irq(&mdev->tconn->req_lock);
2524
2525         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2526                 return 0;
2527
2528         /* don't care for the reason here */
2529         dev_err(DEV, "submit failed, triggering re-connect\n");
2530         spin_lock_irq(&mdev->tconn->req_lock);
2531         list_del(&peer_req->w.list);
2532         spin_unlock_irq(&mdev->tconn->req_lock);
2533         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2534
2535 out_free_e:
2536         put_ldev(mdev);
2537         drbd_free_peer_req(mdev, peer_req);
2538         return -EIO;
2539 }
2540
2541 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2542 {
2543         int self, peer, rv = -100;
2544         unsigned long ch_self, ch_peer;
2545         enum drbd_after_sb_p after_sb_0p;
2546
2547         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2548         peer = mdev->p_uuid[UI_BITMAP] & 1;
2549
2550         ch_peer = mdev->p_uuid[UI_SIZE];
2551         ch_self = mdev->comm_bm_set;
2552
2553         rcu_read_lock();
2554         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2555         rcu_read_unlock();
2556         switch (after_sb_0p) {
2557         case ASB_CONSENSUS:
2558         case ASB_DISCARD_SECONDARY:
2559         case ASB_CALL_HELPER:
2560         case ASB_VIOLENTLY:
2561                 dev_err(DEV, "Configuration error.\n");
2562                 break;
2563         case ASB_DISCONNECT:
2564                 break;
2565         case ASB_DISCARD_YOUNGER_PRI:
2566                 if (self == 0 && peer == 1) {
2567                         rv = -1;
2568                         break;
2569                 }
2570                 if (self == 1 && peer == 0) {
2571                         rv =  1;
2572                         break;
2573                 }
2574                 /* Else fall through to one of the other strategies... */
2575         case ASB_DISCARD_OLDER_PRI:
2576                 if (self == 0 && peer == 1) {
2577                         rv = 1;
2578                         break;
2579                 }
2580                 if (self == 1 && peer == 0) {
2581                         rv = -1;
2582                         break;
2583                 }
2584                 /* Else fall through to one of the other strategies... */
2585                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2586                      "Using discard-least-changes instead\n");
2587         case ASB_DISCARD_ZERO_CHG:
2588                 if (ch_peer == 0 && ch_self == 0) {
2589                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2590                                 ? -1 : 1;
2591                         break;
2592                 } else {
2593                         if (ch_peer == 0) { rv =  1; break; }
2594                         if (ch_self == 0) { rv = -1; break; }
2595                 }
2596                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2597                         break;
2598         case ASB_DISCARD_LEAST_CHG:
2599                 if      (ch_self < ch_peer)
2600                         rv = -1;
2601                 else if (ch_self > ch_peer)
2602                         rv =  1;
2603                 else /* ( ch_self == ch_peer ) */
2604                      /* Well, then use something else. */
2605                         rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2606                                 ? -1 : 1;
2607                 break;
2608         case ASB_DISCARD_LOCAL:
2609                 rv = -1;
2610                 break;
2611         case ASB_DISCARD_REMOTE:
2612                 rv =  1;
2613         }
2614
2615         return rv;
2616 }
2617
2618 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2619 {
2620         int hg, rv = -100;
2621         enum drbd_after_sb_p after_sb_1p;
2622
2623         rcu_read_lock();
2624         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2625         rcu_read_unlock();
2626         switch (after_sb_1p) {
2627         case ASB_DISCARD_YOUNGER_PRI:
2628         case ASB_DISCARD_OLDER_PRI:
2629         case ASB_DISCARD_LEAST_CHG:
2630         case ASB_DISCARD_LOCAL:
2631         case ASB_DISCARD_REMOTE:
2632         case ASB_DISCARD_ZERO_CHG:
2633                 dev_err(DEV, "Configuration error.\n");
2634                 break;
2635         case ASB_DISCONNECT:
2636                 break;
2637         case ASB_CONSENSUS:
2638                 hg = drbd_asb_recover_0p(mdev);
2639                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2640                         rv = hg;
2641                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2642                         rv = hg;
2643                 break;
2644         case ASB_VIOLENTLY:
2645                 rv = drbd_asb_recover_0p(mdev);
2646                 break;
2647         case ASB_DISCARD_SECONDARY:
2648                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2649         case ASB_CALL_HELPER:
2650                 hg = drbd_asb_recover_0p(mdev);
2651                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2652                         enum drbd_state_rv rv2;
2653
2654                         drbd_set_role(mdev, R_SECONDARY, 0);
2655                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2656                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2657                           * we do not need to wait for the after state change work either. */
2658                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2659                         if (rv2 != SS_SUCCESS) {
2660                                 drbd_khelper(mdev, "pri-lost-after-sb");
2661                         } else {
2662                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2663                                 rv = hg;
2664                         }
2665                 } else
2666                         rv = hg;
2667         }
2668
2669         return rv;
2670 }
2671
2672 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2673 {
2674         int hg, rv = -100;
2675         enum drbd_after_sb_p after_sb_2p;
2676
2677         rcu_read_lock();
2678         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2679         rcu_read_unlock();
2680         switch (after_sb_2p) {
2681         case ASB_DISCARD_YOUNGER_PRI:
2682         case ASB_DISCARD_OLDER_PRI:
2683         case ASB_DISCARD_LEAST_CHG:
2684         case ASB_DISCARD_LOCAL:
2685         case ASB_DISCARD_REMOTE:
2686         case ASB_CONSENSUS:
2687         case ASB_DISCARD_SECONDARY:
2688         case ASB_DISCARD_ZERO_CHG:
2689                 dev_err(DEV, "Configuration error.\n");
2690                 break;
2691         case ASB_VIOLENTLY:
2692                 rv = drbd_asb_recover_0p(mdev);
2693                 break;
2694         case ASB_DISCONNECT:
2695                 break;
2696         case ASB_CALL_HELPER:
2697                 hg = drbd_asb_recover_0p(mdev);
2698                 if (hg == -1) {
2699                         enum drbd_state_rv rv2;
2700
2701                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2702                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2703                           * we do not need to wait for the after state change work either. */
2704                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2705                         if (rv2 != SS_SUCCESS) {
2706                                 drbd_khelper(mdev, "pri-lost-after-sb");
2707                         } else {
2708                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2709                                 rv = hg;
2710                         }
2711                 } else
2712                         rv = hg;
2713         }
2714
2715         return rv;
2716 }
2717
2718 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2719                            u64 bits, u64 flags)
2720 {
2721         if (!uuid) {
2722                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2723                 return;
2724         }
2725         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2726              text,
2727              (unsigned long long)uuid[UI_CURRENT],
2728              (unsigned long long)uuid[UI_BITMAP],
2729              (unsigned long long)uuid[UI_HISTORY_START],
2730              (unsigned long long)uuid[UI_HISTORY_END],
2731              (unsigned long long)bits,
2732              (unsigned long long)flags);
2733 }
2734
2735 /*
2736   100   after split brain try auto recover
2737     2   C_SYNC_SOURCE set BitMap
2738     1   C_SYNC_SOURCE use BitMap
2739     0   no Sync
2740    -1   C_SYNC_TARGET use BitMap
2741    -2   C_SYNC_TARGET set BitMap
2742  -100   after split brain, disconnect
2743 -1000   unrelated data
2744 -1091   requires proto 91
2745 -1096   requires proto 96
2746  */
2747 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2748 {
2749         u64 self, peer;
2750         int i, j;
2751
2752         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2753         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2754
2755         *rule_nr = 10;
2756         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2757                 return 0;
2758
2759         *rule_nr = 20;
2760         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2761              peer != UUID_JUST_CREATED)
2762                 return -2;
2763
2764         *rule_nr = 30;
2765         if (self != UUID_JUST_CREATED &&
2766             (peer == UUID_JUST_CREATED || peer == (u64)0))
2767                 return 2;
2768
2769         if (self == peer) {
2770                 int rct, dc; /* roles at crash time */
2771
2772                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2773
2774                         if (mdev->tconn->agreed_pro_version < 91)
2775                                 return -1091;
2776
2777                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2778                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2779                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2780                                 drbd_uuid_move_history(mdev);
2781                                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2782                                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2783
2784                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2785                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2786                                 *rule_nr = 34;
2787                         } else {
2788                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2789                                 *rule_nr = 36;
2790                         }
2791
2792                         return 1;
2793                 }
2794
2795                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2796
2797                         if (mdev->tconn->agreed_pro_version < 91)
2798                                 return -1091;
2799
2800                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2801                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2802                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2803
2804                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2805                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2806                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2807
2808                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2809                                 *rule_nr = 35;
2810                         } else {
2811                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2812                                 *rule_nr = 37;
2813                         }
2814
2815                         return -1;
2816                 }
2817
2818                 /* Common power [off|failure] */
2819                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2820                         (mdev->p_uuid[UI_FLAGS] & 2);
2821                 /* lowest bit is set when we were primary,
2822                  * next bit (weight 2) is set when peer was primary */
2823                 *rule_nr = 40;
2824
2825                 switch (rct) {
2826                 case 0: /* !self_pri && !peer_pri */ return 0;
2827                 case 1: /*  self_pri && !peer_pri */ return 1;
2828                 case 2: /* !self_pri &&  peer_pri */ return -1;
2829                 case 3: /*  self_pri &&  peer_pri */
2830                         dc = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags);
2831                         return dc ? -1 : 1;
2832                 }
2833         }
2834
2835         *rule_nr = 50;
2836         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2837         if (self == peer)
2838                 return -1;
2839
2840         *rule_nr = 51;
2841         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2842         if (self == peer) {
2843                 if (mdev->tconn->agreed_pro_version < 96 ?
2844                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2845                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2846                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2847                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2848                            resync as sync source modifications of the peer's UUIDs. */
2849
2850                         if (mdev->tconn->agreed_pro_version < 91)
2851                                 return -1091;
2852
2853                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2854                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2855
2856                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2857                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2858
2859                         return -1;
2860                 }
2861         }
2862
2863         *rule_nr = 60;
2864         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2865         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2866                 peer = mdev->p_uuid[i] & ~((u64)1);
2867                 if (self == peer)
2868                         return -2;
2869         }
2870
2871         *rule_nr = 70;
2872         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2873         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2874         if (self == peer)
2875                 return 1;
2876
2877         *rule_nr = 71;
2878         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2879         if (self == peer) {
2880                 if (mdev->tconn->agreed_pro_version < 96 ?
2881                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2882                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2883                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2884                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2885                            resync as sync source modifications of our UUIDs. */
2886
2887                         if (mdev->tconn->agreed_pro_version < 91)
2888                                 return -1091;
2889
2890                         __drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2891                         __drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2892
2893                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2894                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2895                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2896
2897                         return 1;
2898                 }
2899         }
2900
2901
2902         *rule_nr = 80;
2903         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2904         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2905                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2906                 if (self == peer)
2907                         return 2;
2908         }
2909
2910         *rule_nr = 90;
2911         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2912         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2913         if (self == peer && self != ((u64)0))
2914                 return 100;
2915
2916         *rule_nr = 100;
2917         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2918                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2919                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2920                         peer = mdev->p_uuid[j] & ~((u64)1);
2921                         if (self == peer)
2922                                 return -100;
2923                 }
2924         }
2925
2926         return -1000;
2927 }
2928
2929 /* drbd_sync_handshake() returns the new conn state on success, or
2930    CONN_MASK (-1) on failure.
2931  */
2932 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2933                                            enum drbd_disk_state peer_disk) __must_hold(local)
2934 {
2935         enum drbd_conns rv = C_MASK;
2936         enum drbd_disk_state mydisk;
2937         struct net_conf *nc;
2938         int hg, rule_nr, rr_conflict, tentative;
2939
2940         mydisk = mdev->state.disk;
2941         if (mydisk == D_NEGOTIATING)
2942                 mydisk = mdev->new_state_tmp.disk;
2943
2944         dev_info(DEV, "drbd_sync_handshake:\n");
2945
2946         spin_lock_irq(&mdev->ldev->md.uuid_lock);
2947         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2948         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2949                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2950
2951         hg = drbd_uuid_compare(mdev, &rule_nr);
2952         spin_unlock_irq(&mdev->ldev->md.uuid_lock);
2953
2954         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2955
2956         if (hg == -1000) {
2957                 dev_alert(DEV, "Unrelated data, aborting!\n");
2958                 return C_MASK;
2959         }
2960         if (hg < -1000) {
2961                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2962                 return C_MASK;
2963         }
2964
2965         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2966             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2967                 int f = (hg == -100) || abs(hg) == 2;
2968                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2969                 if (f)
2970                         hg = hg*2;
2971                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2972                      hg > 0 ? "source" : "target");
2973         }
2974
2975         if (abs(hg) == 100)
2976                 drbd_khelper(mdev, "initial-split-brain");
2977
2978         rcu_read_lock();
2979         nc = rcu_dereference(mdev->tconn->net_conf);
2980
2981         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2982                 int pcount = (mdev->state.role == R_PRIMARY)
2983                            + (peer_role == R_PRIMARY);
2984                 int forced = (hg == -100);
2985
2986                 switch (pcount) {
2987                 case 0:
2988                         hg = drbd_asb_recover_0p(mdev);
2989                         break;
2990                 case 1:
2991                         hg = drbd_asb_recover_1p(mdev);
2992                         break;
2993                 case 2:
2994                         hg = drbd_asb_recover_2p(mdev);
2995                         break;
2996                 }
2997                 if (abs(hg) < 100) {
2998                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2999                              "automatically solved. Sync from %s node\n",
3000                              pcount, (hg < 0) ? "peer" : "this");
3001                         if (forced) {
3002                                 dev_warn(DEV, "Doing a full sync, since"
3003                                      " UUIDs where ambiguous.\n");
3004                                 hg = hg*2;
3005                         }
3006                 }
3007         }
3008
3009         if (hg == -100) {
3010                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3011                         hg = -1;
3012                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3013                         hg = 1;
3014
3015                 if (abs(hg) < 100)
3016                         dev_warn(DEV, "Split-Brain detected, manually solved. "
3017                              "Sync from %s node\n",
3018                              (hg < 0) ? "peer" : "this");
3019         }
3020         rr_conflict = nc->rr_conflict;
3021         tentative = nc->tentative;
3022         rcu_read_unlock();
3023
3024         if (hg == -100) {
3025                 /* FIXME this log message is not correct if we end up here
3026                  * after an attempted attach on a diskless node.
3027                  * We just refuse to attach -- well, we drop the "connection"
3028                  * to that disk, in a way... */
3029                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3030                 drbd_khelper(mdev, "split-brain");
3031                 return C_MASK;
3032         }
3033
3034         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3035                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3036                 return C_MASK;
3037         }
3038
3039         if (hg < 0 && /* by intention we do not use mydisk here. */
3040             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3041                 switch (rr_conflict) {
3042                 case ASB_CALL_HELPER:
3043                         drbd_khelper(mdev, "pri-lost");
3044                         /* fall through */
3045                 case ASB_DISCONNECT:
3046                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3047                         return C_MASK;
3048                 case ASB_VIOLENTLY:
3049                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3050                              "assumption\n");
3051                 }
3052         }
3053
3054         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3055                 if (hg == 0)
3056                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3057                 else
3058                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3059                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3060                                  abs(hg) >= 2 ? "full" : "bit-map based");
3061                 return C_MASK;
3062         }
3063
3064         if (abs(hg) >= 2) {
3065                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3066                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3067                                         BM_LOCKED_SET_ALLOWED))
3068                         return C_MASK;
3069         }
3070
3071         if (hg > 0) { /* become sync source. */
3072                 rv = C_WF_BITMAP_S;
3073         } else if (hg < 0) { /* become sync target */
3074                 rv = C_WF_BITMAP_T;
3075         } else {
3076                 rv = C_CONNECTED;
3077                 if (drbd_bm_total_weight(mdev)) {
3078                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3079                              drbd_bm_total_weight(mdev));
3080                 }
3081         }
3082
3083         return rv;
3084 }
3085
3086 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3087 {
3088         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3089         if (peer == ASB_DISCARD_REMOTE)
3090                 return ASB_DISCARD_LOCAL;
3091
3092         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3093         if (peer == ASB_DISCARD_LOCAL)
3094                 return ASB_DISCARD_REMOTE;
3095
3096         /* everything else is valid if they are equal on both sides. */
3097         return peer;
3098 }
3099
3100 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3101 {
3102         struct p_protocol *p = pi->data;
3103         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3104         int p_proto, p_discard_my_data, p_two_primaries, cf;
3105         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3106         char integrity_alg[SHARED_SECRET_MAX] = "";
3107         struct crypto_hash *peer_integrity_tfm = NULL;
3108         void *int_dig_in = NULL, *int_dig_vv = NULL;
3109
3110         p_proto         = be32_to_cpu(p->protocol);
3111         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3112         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3113         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3114         p_two_primaries = be32_to_cpu(p->two_primaries);
3115         cf              = be32_to_cpu(p->conn_flags);
3116         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3117
3118         if (tconn->agreed_pro_version >= 87) {
3119                 int err;
3120
3121                 if (pi->size > sizeof(integrity_alg))
3122                         return -EIO;
3123                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3124                 if (err)
3125                         return err;
3126                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3127         }
3128
3129         if (pi->cmd != P_PROTOCOL_UPDATE) {
3130                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3131
3132                 if (cf & CF_DRY_RUN)
3133                         set_bit(CONN_DRY_RUN, &tconn->flags);
3134
3135                 rcu_read_lock();
3136                 nc = rcu_dereference(tconn->net_conf);
3137
3138                 if (p_proto != nc->wire_protocol) {
3139                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3140                         goto disconnect_rcu_unlock;
3141                 }
3142
3143                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3144                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3145                         goto disconnect_rcu_unlock;
3146                 }
3147
3148                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3149                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3150                         goto disconnect_rcu_unlock;
3151                 }
3152
3153                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3154                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3155                         goto disconnect_rcu_unlock;
3156                 }
3157
3158                 if (p_discard_my_data && nc->discard_my_data) {
3159                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3160                         goto disconnect_rcu_unlock;
3161                 }
3162
3163                 if (p_two_primaries != nc->two_primaries) {
3164                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3165                         goto disconnect_rcu_unlock;
3166                 }
3167
3168                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3169                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3170                         goto disconnect_rcu_unlock;
3171                 }
3172
3173                 rcu_read_unlock();
3174         }
3175
3176         if (integrity_alg[0]) {
3177                 int hash_size;
3178
3179                 /*
3180                  * We can only change the peer data integrity algorithm
3181                  * here.  Changing our own data integrity algorithm
3182                  * requires that we send a P_PROTOCOL_UPDATE packet at
3183                  * the same time; otherwise, the peer has no way to
3184                  * tell between which packets the algorithm should
3185                  * change.
3186                  */
3187
3188                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3189                 if (!peer_integrity_tfm) {
3190                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3191                                  integrity_alg);
3192                         goto disconnect;
3193                 }
3194
3195                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3196                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3197                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3198                 if (!(int_dig_in && int_dig_vv)) {
3199                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3200                         goto disconnect;
3201                 }
3202         }
3203
3204         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3205         if (!new_net_conf) {
3206                 conn_err(tconn, "Allocation of new net_conf failed\n");
3207                 goto disconnect;
3208         }
3209
3210         mutex_lock(&tconn->data.mutex);
3211         mutex_lock(&tconn->conf_update);
3212         old_net_conf = tconn->net_conf;
3213         *new_net_conf = *old_net_conf;
3214
3215         new_net_conf->wire_protocol = p_proto;
3216         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3217         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3218         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3219         new_net_conf->two_primaries = p_two_primaries;
3220
3221         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3222         mutex_unlock(&tconn->conf_update);
3223         mutex_unlock(&tconn->data.mutex);
3224
3225         crypto_free_hash(tconn->peer_integrity_tfm);
3226         kfree(tconn->int_dig_in);
3227         kfree(tconn->int_dig_vv);
3228         tconn->peer_integrity_tfm = peer_integrity_tfm;
3229         tconn->int_dig_in = int_dig_in;
3230         tconn->int_dig_vv = int_dig_vv;
3231
3232         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3233                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3234                           integrity_alg[0] ? integrity_alg : "(none)");
3235
3236         synchronize_rcu();
3237         kfree(old_net_conf);
3238         return 0;
3239
3240 disconnect_rcu_unlock:
3241         rcu_read_unlock();
3242 disconnect:
3243         crypto_free_hash(peer_integrity_tfm);
3244         kfree(int_dig_in);
3245         kfree(int_dig_vv);
3246         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3247         return -EIO;
3248 }
3249
3250 /* helper function
3251  * input: alg name, feature name
3252  * return: NULL (alg name was "")
3253  *         ERR_PTR(error) if something goes wrong
3254  *         or the crypto hash ptr, if it worked out ok. */
3255 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3256                 const char *alg, const char *name)
3257 {
3258         struct crypto_hash *tfm;
3259
3260         if (!alg[0])
3261                 return NULL;
3262
3263         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3264         if (IS_ERR(tfm)) {
3265                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3266                         alg, name, PTR_ERR(tfm));
3267                 return tfm;
3268         }
3269         return tfm;
3270 }
3271
3272 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3273 {
3274         void *buffer = tconn->data.rbuf;
3275         int size = pi->size;
3276
3277         while (size) {
3278                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3279                 s = drbd_recv(tconn, buffer, s);
3280                 if (s <= 0) {
3281                         if (s < 0)
3282                                 return s;
3283                         break;
3284                 }
3285                 size -= s;
3286         }
3287         if (size)
3288                 return -EIO;
3289         return 0;
3290 }
3291
3292 /*
3293  * config_unknown_volume  -  device configuration command for unknown volume
3294  *
3295  * When a device is added to an existing connection, the node on which the
3296  * device is added first will send configuration commands to its peer but the
3297  * peer will not know about the device yet.  It will warn and ignore these
3298  * commands.  Once the device is added on the second node, the second node will
3299  * send the same device configuration commands, but in the other direction.
3300  *
3301  * (We can also end up here if drbd is misconfigured.)
3302  */
3303 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3304 {
3305         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3306                   cmdname(pi->cmd), pi->vnr);
3307         return ignore_remaining_packet(tconn, pi);
3308 }
3309
3310 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3311 {
3312         struct drbd_conf *mdev;
3313         struct p_rs_param_95 *p;
3314         unsigned int header_size, data_size, exp_max_sz;
3315         struct crypto_hash *verify_tfm = NULL;
3316         struct crypto_hash *csums_tfm = NULL;
3317         struct net_conf *old_net_conf, *new_net_conf = NULL;
3318         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3319         const int apv = tconn->agreed_pro_version;
3320         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3321         int fifo_size = 0;
3322         int err;
3323
3324         mdev = vnr_to_mdev(tconn, pi->vnr);
3325         if (!mdev)
3326                 return config_unknown_volume(tconn, pi);
3327
3328         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3329                     : apv == 88 ? sizeof(struct p_rs_param)
3330                                         + SHARED_SECRET_MAX
3331                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3332                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3333
3334         if (pi->size > exp_max_sz) {
3335                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3336                     pi->size, exp_max_sz);
3337                 return -EIO;
3338         }
3339
3340         if (apv <= 88) {
3341                 header_size = sizeof(struct p_rs_param);
3342                 data_size = pi->size - header_size;
3343         } else if (apv <= 94) {
3344                 header_size = sizeof(struct p_rs_param_89);
3345                 data_size = pi->size - header_size;
3346                 D_ASSERT(data_size == 0);
3347         } else {
3348                 header_size = sizeof(struct p_rs_param_95);
3349                 data_size = pi->size - header_size;
3350                 D_ASSERT(data_size == 0);
3351         }
3352
3353         /* initialize verify_alg and csums_alg */
3354         p = pi->data;
3355         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3356
3357         err = drbd_recv_all(mdev->tconn, p, header_size);
3358         if (err)
3359                 return err;
3360
3361         mutex_lock(&mdev->tconn->conf_update);
3362         old_net_conf = mdev->tconn->net_conf;
3363         if (get_ldev(mdev)) {
3364                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3365                 if (!new_disk_conf) {
3366                         put_ldev(mdev);
3367                         mutex_unlock(&mdev->tconn->conf_update);
3368                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3369                         return -ENOMEM;
3370                 }
3371
3372                 old_disk_conf = mdev->ldev->disk_conf;
3373                 *new_disk_conf = *old_disk_conf;
3374
3375                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3376         }
3377
3378         if (apv >= 88) {
3379                 if (apv == 88) {
3380                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3381                                 dev_err(DEV, "verify-alg of wrong size, "
3382                                         "peer wants %u, accepting only up to %u byte\n",
3383                                         data_size, SHARED_SECRET_MAX);
3384                                 err = -EIO;
3385                                 goto reconnect;
3386                         }
3387
3388                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3389                         if (err)
3390                                 goto reconnect;
3391                         /* we expect NUL terminated string */
3392                         /* but just in case someone tries to be evil */
3393                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3394                         p->verify_alg[data_size-1] = 0;
3395
3396                 } else /* apv >= 89 */ {
3397                         /* we still expect NUL terminated strings */
3398                         /* but just in case someone tries to be evil */
3399                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3400                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3401                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3402                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3403                 }
3404
3405                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3406                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3407                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3408                                     old_net_conf->verify_alg, p->verify_alg);
3409                                 goto disconnect;
3410                         }
3411                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3412                                         p->verify_alg, "verify-alg");
3413                         if (IS_ERR(verify_tfm)) {
3414                                 verify_tfm = NULL;
3415                                 goto disconnect;
3416                         }
3417                 }
3418
3419                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3420                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3421                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3422                                     old_net_conf->csums_alg, p->csums_alg);
3423                                 goto disconnect;
3424                         }
3425                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3426                                         p->csums_alg, "csums-alg");
3427                         if (IS_ERR(csums_tfm)) {
3428                                 csums_tfm = NULL;
3429                                 goto disconnect;
3430                         }
3431                 }
3432
3433                 if (apv > 94 && new_disk_conf) {
3434                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3435                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3436                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3437                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3438
3439                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3440                         if (fifo_size != mdev->rs_plan_s->size) {
3441                                 new_plan = fifo_alloc(fifo_size);
3442                                 if (!new_plan) {
3443                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3444                                         put_ldev(mdev);
3445                                         goto disconnect;
3446                                 }
3447                         }
3448                 }
3449
3450                 if (verify_tfm || csums_tfm) {
3451                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3452                         if (!new_net_conf) {
3453                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3454                                 goto disconnect;
3455                         }
3456
3457                         *new_net_conf = *old_net_conf;
3458
3459                         if (verify_tfm) {
3460                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3461                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3462                                 crypto_free_hash(mdev->tconn->verify_tfm);
3463                                 mdev->tconn->verify_tfm = verify_tfm;
3464                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3465                         }
3466                         if (csums_tfm) {
3467                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3468                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3469                                 crypto_free_hash(mdev->tconn->csums_tfm);
3470                                 mdev->tconn->csums_tfm = csums_tfm;
3471                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3472                         }
3473                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3474                 }
3475         }
3476
3477         if (new_disk_conf) {
3478                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3479                 put_ldev(mdev);
3480         }
3481
3482         if (new_plan) {
3483                 old_plan = mdev->rs_plan_s;
3484                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3485         }
3486
3487         mutex_unlock(&mdev->tconn->conf_update);
3488         synchronize_rcu();
3489         if (new_net_conf)
3490                 kfree(old_net_conf);
3491         kfree(old_disk_conf);
3492         kfree(old_plan);
3493
3494         return 0;
3495
3496 reconnect:
3497         if (new_disk_conf) {
3498                 put_ldev(mdev);
3499                 kfree(new_disk_conf);
3500         }
3501         mutex_unlock(&mdev->tconn->conf_update);
3502         return -EIO;
3503
3504 disconnect:
3505         kfree(new_plan);
3506         if (new_disk_conf) {
3507                 put_ldev(mdev);
3508                 kfree(new_disk_conf);
3509         }
3510         mutex_unlock(&mdev->tconn->conf_update);
3511         /* just for completeness: actually not needed,
3512          * as this is not reached if csums_tfm was ok. */
3513         crypto_free_hash(csums_tfm);
3514         /* but free the verify_tfm again, if csums_tfm did not work out */
3515         crypto_free_hash(verify_tfm);
3516         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3517         return -EIO;
3518 }
3519
3520 /* warn if the arguments differ by more than 12.5% */
3521 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3522         const char *s, sector_t a, sector_t b)
3523 {
3524         sector_t d;
3525         if (a == 0 || b == 0)
3526                 return;
3527         d = (a > b) ? (a - b) : (b - a);
3528         if (d > (a>>3) || d > (b>>3))
3529                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3530                      (unsigned long long)a, (unsigned long long)b);
3531 }
3532
3533 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3534 {
3535         struct drbd_conf *mdev;
3536         struct p_sizes *p = pi->data;
3537         enum determine_dev_size dd = unchanged;
3538         sector_t p_size, p_usize, my_usize;
3539         int ldsc = 0; /* local disk size changed */
3540         enum dds_flags ddsf;
3541
3542         mdev = vnr_to_mdev(tconn, pi->vnr);
3543         if (!mdev)
3544                 return config_unknown_volume(tconn, pi);
3545
3546         p_size = be64_to_cpu(p->d_size);
3547         p_usize = be64_to_cpu(p->u_size);
3548
3549         /* just store the peer's disk size for now.
3550          * we still need to figure out whether we accept that. */
3551         mdev->p_size = p_size;
3552
3553         if (get_ldev(mdev)) {
3554                 rcu_read_lock();
3555                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3556                 rcu_read_unlock();
3557
3558                 warn_if_differ_considerably(mdev, "lower level device sizes",
3559                            p_size, drbd_get_max_capacity(mdev->ldev));
3560                 warn_if_differ_considerably(mdev, "user requested size",
3561                                             p_usize, my_usize);
3562
3563                 /* if this is the first connect, or an otherwise expected
3564                  * param exchange, choose the minimum */
3565                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3566                         p_usize = min_not_zero(my_usize, p_usize);
3567
3568                 /* Never shrink a device with usable data during connect.
3569                    But allow online shrinking if we are connected. */
3570                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3571                     drbd_get_capacity(mdev->this_bdev) &&
3572                     mdev->state.disk >= D_OUTDATED &&
3573                     mdev->state.conn < C_CONNECTED) {
3574                         dev_err(DEV, "The peer's disk size is too small!\n");
3575                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3576                         put_ldev(mdev);
3577                         return -EIO;
3578                 }
3579
3580                 if (my_usize != p_usize) {
3581                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3582
3583                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3584                         if (!new_disk_conf) {
3585                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3586                                 put_ldev(mdev);
3587                                 return -ENOMEM;
3588                         }
3589
3590                         mutex_lock(&mdev->tconn->conf_update);
3591                         old_disk_conf = mdev->ldev->disk_conf;
3592                         *new_disk_conf = *old_disk_conf;
3593                         new_disk_conf->disk_size = p_usize;
3594
3595                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3596                         mutex_unlock(&mdev->tconn->conf_update);
3597                         synchronize_rcu();
3598                         kfree(old_disk_conf);
3599
3600                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3601                                  (unsigned long)my_usize);
3602                 }
3603
3604                 put_ldev(mdev);
3605         }
3606
3607         ddsf = be16_to_cpu(p->dds_flags);
3608         if (get_ldev(mdev)) {
3609                 dd = drbd_determine_dev_size(mdev, ddsf);
3610                 put_ldev(mdev);
3611                 if (dd == dev_size_error)
3612                         return -EIO;
3613                 drbd_md_sync(mdev);
3614         } else {
3615                 /* I am diskless, need to accept the peer's size. */
3616                 drbd_set_my_capacity(mdev, p_size);
3617         }
3618
3619         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3620         drbd_reconsider_max_bio_size(mdev);
3621
3622         if (get_ldev(mdev)) {
3623                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3624                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3625                         ldsc = 1;
3626                 }
3627
3628                 put_ldev(mdev);
3629         }
3630
3631         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3632                 if (be64_to_cpu(p->c_size) !=
3633                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3634                         /* we have different sizes, probably peer
3635                          * needs to know my new size... */
3636                         drbd_send_sizes(mdev, 0, ddsf);
3637                 }
3638                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3639                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3640                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3641                             mdev->state.disk >= D_INCONSISTENT) {
3642                                 if (ddsf & DDSF_NO_RESYNC)
3643                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3644                                 else
3645                                         resync_after_online_grow(mdev);
3646                         } else
3647                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3648                 }
3649         }
3650
3651         return 0;
3652 }
3653
3654 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3655 {
3656         struct drbd_conf *mdev;
3657         struct p_uuids *p = pi->data;
3658         u64 *p_uuid;
3659         int i, updated_uuids = 0;
3660
3661         mdev = vnr_to_mdev(tconn, pi->vnr);
3662         if (!mdev)
3663                 return config_unknown_volume(tconn, pi);
3664
3665         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3666
3667         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3668                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3669
3670         kfree(mdev->p_uuid);
3671         mdev->p_uuid = p_uuid;
3672
3673         if (mdev->state.conn < C_CONNECTED &&
3674             mdev->state.disk < D_INCONSISTENT &&
3675             mdev->state.role == R_PRIMARY &&
3676             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3677                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3678                     (unsigned long long)mdev->ed_uuid);
3679                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3680                 return -EIO;
3681         }
3682
3683         if (get_ldev(mdev)) {
3684                 int skip_initial_sync =
3685                         mdev->state.conn == C_CONNECTED &&
3686                         mdev->tconn->agreed_pro_version >= 90 &&
3687                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3688                         (p_uuid[UI_FLAGS] & 8);
3689                 if (skip_initial_sync) {
3690                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3691                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3692                                         "clear_n_write from receive_uuids",
3693                                         BM_LOCKED_TEST_ALLOWED);
3694                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3695                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3696                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3697                                         CS_VERBOSE, NULL);
3698                         drbd_md_sync(mdev);
3699                         updated_uuids = 1;
3700                 }
3701                 put_ldev(mdev);
3702         } else if (mdev->state.disk < D_INCONSISTENT &&
3703                    mdev->state.role == R_PRIMARY) {
3704                 /* I am a diskless primary, the peer just created a new current UUID
3705                    for me. */
3706                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3707         }
3708
3709         /* Before we test for the disk state, we should wait until an eventually
3710            ongoing cluster wide state change is finished. That is important if
3711            we are primary and are detaching from our disk. We need to see the
3712            new disk state... */
3713         mutex_lock(mdev->state_mutex);
3714         mutex_unlock(mdev->state_mutex);
3715         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3716                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3717
3718         if (updated_uuids)
3719                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3720
3721         return 0;
3722 }
3723
3724 /**
3725  * convert_state() - Converts the peer's view of the cluster state to our point of view
3726  * @ps:         The state as seen by the peer.
3727  */
3728 static union drbd_state convert_state(union drbd_state ps)
3729 {
3730         union drbd_state ms;
3731
3732         static enum drbd_conns c_tab[] = {
3733                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3734                 [C_CONNECTED] = C_CONNECTED,
3735
3736                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3737                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3738                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3739                 [C_VERIFY_S]       = C_VERIFY_T,
3740                 [C_MASK]   = C_MASK,
3741         };
3742
3743         ms.i = ps.i;
3744
3745         ms.conn = c_tab[ps.conn];
3746         ms.peer = ps.role;
3747         ms.role = ps.peer;
3748         ms.pdsk = ps.disk;
3749         ms.disk = ps.pdsk;
3750         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3751
3752         return ms;
3753 }
3754
3755 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3756 {
3757         struct drbd_conf *mdev;
3758         struct p_req_state *p = pi->data;
3759         union drbd_state mask, val;
3760         enum drbd_state_rv rv;
3761
3762         mdev = vnr_to_mdev(tconn, pi->vnr);
3763         if (!mdev)
3764                 return -EIO;
3765
3766         mask.i = be32_to_cpu(p->mask);
3767         val.i = be32_to_cpu(p->val);
3768
3769         if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags) &&
3770             mutex_is_locked(mdev->state_mutex)) {
3771                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3772                 return 0;
3773         }
3774
3775         mask = convert_state(mask);
3776         val = convert_state(val);
3777
3778         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3779         drbd_send_sr_reply(mdev, rv);
3780
3781         drbd_md_sync(mdev);
3782
3783         return 0;
3784 }
3785
3786 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3787 {
3788         struct p_req_state *p = pi->data;
3789         union drbd_state mask, val;
3790         enum drbd_state_rv rv;
3791
3792         mask.i = be32_to_cpu(p->mask);
3793         val.i = be32_to_cpu(p->val);
3794
3795         if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
3796             mutex_is_locked(&tconn->cstate_mutex)) {
3797                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3798                 return 0;
3799         }
3800
3801         mask = convert_state(mask);
3802         val = convert_state(val);
3803
3804         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3805         conn_send_sr_reply(tconn, rv);
3806
3807         return 0;
3808 }
3809
3810 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3811 {
3812         struct drbd_conf *mdev;
3813         struct p_state *p = pi->data;
3814         union drbd_state os, ns, peer_state;
3815         enum drbd_disk_state real_peer_disk;
3816         enum chg_state_flags cs_flags;
3817         int rv;
3818
3819         mdev = vnr_to_mdev(tconn, pi->vnr);
3820         if (!mdev)
3821                 return config_unknown_volume(tconn, pi);
3822
3823         peer_state.i = be32_to_cpu(p->state);
3824
3825         real_peer_disk = peer_state.disk;
3826         if (peer_state.disk == D_NEGOTIATING) {
3827                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3828                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3829         }
3830
3831         spin_lock_irq(&mdev->tconn->req_lock);
3832  retry:
3833         os = ns = drbd_read_state(mdev);
3834         spin_unlock_irq(&mdev->tconn->req_lock);
3835
3836         /* If some other part of the code (asender thread, timeout)
3837          * already decided to close the connection again,
3838          * we must not "re-establish" it here. */
3839         if (os.conn <= C_TEAR_DOWN)
3840                 return -ECONNRESET;
3841
3842         /* If this is the "end of sync" confirmation, usually the peer disk
3843          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3844          * set) resync started in PausedSyncT, or if the timing of pause-/
3845          * unpause-sync events has been "just right", the peer disk may
3846          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3847          */
3848         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3849             real_peer_disk == D_UP_TO_DATE &&
3850             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3851                 /* If we are (becoming) SyncSource, but peer is still in sync
3852                  * preparation, ignore its uptodate-ness to avoid flapping, it
3853                  * will change to inconsistent once the peer reaches active
3854                  * syncing states.
3855                  * It may have changed syncer-paused flags, however, so we
3856                  * cannot ignore this completely. */
3857                 if (peer_state.conn > C_CONNECTED &&
3858                     peer_state.conn < C_SYNC_SOURCE)
3859                         real_peer_disk = D_INCONSISTENT;
3860
3861                 /* if peer_state changes to connected at the same time,
3862                  * it explicitly notifies us that it finished resync.
3863                  * Maybe we should finish it up, too? */
3864                 else if (os.conn >= C_SYNC_SOURCE &&
3865                          peer_state.conn == C_CONNECTED) {
3866                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3867                                 drbd_resync_finished(mdev);
3868                         return 0;
3869                 }
3870         }
3871
3872         /* explicit verify finished notification, stop sector reached. */
3873         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3874             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3875                 ov_out_of_sync_print(mdev);
3876                 drbd_resync_finished(mdev);
3877                 return 0;
3878         }
3879
3880         /* peer says his disk is inconsistent, while we think it is uptodate,
3881          * and this happens while the peer still thinks we have a sync going on,
3882          * but we think we are already done with the sync.
3883          * We ignore this to avoid flapping pdsk.
3884          * This should not happen, if the peer is a recent version of drbd. */
3885         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3886             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3887                 real_peer_disk = D_UP_TO_DATE;
3888
3889         if (ns.conn == C_WF_REPORT_PARAMS)
3890                 ns.conn = C_CONNECTED;
3891
3892         if (peer_state.conn == C_AHEAD)
3893                 ns.conn = C_BEHIND;
3894
3895         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3896             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3897                 int cr; /* consider resync */
3898
3899                 /* if we established a new connection */
3900                 cr  = (os.conn < C_CONNECTED);
3901                 /* if we had an established connection
3902                  * and one of the nodes newly attaches a disk */
3903                 cr |= (os.conn == C_CONNECTED &&
3904                        (peer_state.disk == D_NEGOTIATING ||
3905                         os.disk == D_NEGOTIATING));
3906                 /* if we have both been inconsistent, and the peer has been
3907                  * forced to be UpToDate with --overwrite-data */
3908                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3909                 /* if we had been plain connected, and the admin requested to
3910                  * start a sync by "invalidate" or "invalidate-remote" */
3911                 cr |= (os.conn == C_CONNECTED &&
3912                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3913                                  peer_state.conn <= C_WF_BITMAP_T));
3914
3915                 if (cr)
3916                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3917
3918                 put_ldev(mdev);
3919                 if (ns.conn == C_MASK) {
3920                         ns.conn = C_CONNECTED;
3921                         if (mdev->state.disk == D_NEGOTIATING) {
3922                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3923                         } else if (peer_state.disk == D_NEGOTIATING) {
3924                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3925                                 peer_state.disk = D_DISKLESS;
3926                                 real_peer_disk = D_DISKLESS;
3927                         } else {
3928                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3929                                         return -EIO;
3930                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3931                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3932                                 return -EIO;
3933                         }
3934                 }
3935         }
3936
3937         spin_lock_irq(&mdev->tconn->req_lock);
3938         if (os.i != drbd_read_state(mdev).i)
3939                 goto retry;
3940         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3941         ns.peer = peer_state.role;
3942         ns.pdsk = real_peer_disk;
3943         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3944         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3945                 ns.disk = mdev->new_state_tmp.disk;
3946         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3947         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3948             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3949                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3950                    for temporal network outages! */
3951                 spin_unlock_irq(&mdev->tconn->req_lock);
3952                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3953                 tl_clear(mdev->tconn);
3954                 drbd_uuid_new_current(mdev);
3955                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3956                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3957                 return -EIO;
3958         }
3959         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3960         ns = drbd_read_state(mdev);
3961         spin_unlock_irq(&mdev->tconn->req_lock);
3962
3963         if (rv < SS_SUCCESS) {
3964                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3965                 return -EIO;
3966         }
3967
3968         if (os.conn > C_WF_REPORT_PARAMS) {
3969                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3970                     peer_state.disk != D_NEGOTIATING ) {
3971                         /* we want resync, peer has not yet decided to sync... */
3972                         /* Nowadays only used when forcing a node into primary role and
3973                            setting its disk to UpToDate with that */
3974                         drbd_send_uuids(mdev);
3975                         drbd_send_current_state(mdev);
3976                 }
3977         }
3978
3979         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3980
3981         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3982
3983         return 0;
3984 }
3985
3986 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3987 {
3988         struct drbd_conf *mdev;
3989         struct p_rs_uuid *p = pi->data;
3990
3991         mdev = vnr_to_mdev(tconn, pi->vnr);
3992         if (!mdev)
3993                 return -EIO;
3994
3995         wait_event(mdev->misc_wait,
3996                    mdev->state.conn == C_WF_SYNC_UUID ||
3997                    mdev->state.conn == C_BEHIND ||
3998                    mdev->state.conn < C_CONNECTED ||
3999                    mdev->state.disk < D_NEGOTIATING);
4000
4001         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4002
4003         /* Here the _drbd_uuid_ functions are right, current should
4004            _not_ be rotated into the history */
4005         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4006                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4007                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4008
4009                 drbd_print_uuids(mdev, "updated sync uuid");
4010                 drbd_start_resync(mdev, C_SYNC_TARGET);
4011
4012                 put_ldev(mdev);
4013         } else
4014                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
4015
4016         return 0;
4017 }
4018
4019 /**
4020  * receive_bitmap_plain
4021  *
4022  * Return 0 when done, 1 when another iteration is needed, and a negative error
4023  * code upon failure.
4024  */
4025 static int
4026 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4027                      unsigned long *p, struct bm_xfer_ctx *c)
4028 {
4029         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4030                                  drbd_header_size(mdev->tconn);
4031         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4032                                        c->bm_words - c->word_offset);
4033         unsigned int want = num_words * sizeof(*p);
4034         int err;
4035
4036         if (want != size) {
4037                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4038                 return -EIO;
4039         }
4040         if (want == 0)
4041                 return 0;
4042         err = drbd_recv_all(mdev->tconn, p, want);
4043         if (err)
4044                 return err;
4045
4046         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4047
4048         c->word_offset += num_words;
4049         c->bit_offset = c->word_offset * BITS_PER_LONG;
4050         if (c->bit_offset > c->bm_bits)
4051                 c->bit_offset = c->bm_bits;
4052
4053         return 1;
4054 }
4055
4056 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4057 {
4058         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4059 }
4060
4061 static int dcbp_get_start(struct p_compressed_bm *p)
4062 {
4063         return (p->encoding & 0x80) != 0;
4064 }
4065
4066 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4067 {
4068         return (p->encoding >> 4) & 0x7;
4069 }
4070
4071 /**
4072  * recv_bm_rle_bits
4073  *
4074  * Return 0 when done, 1 when another iteration is needed, and a negative error
4075  * code upon failure.
4076  */
4077 static int
4078 recv_bm_rle_bits(struct drbd_conf *mdev,
4079                 struct p_compressed_bm *p,
4080                  struct bm_xfer_ctx *c,
4081                  unsigned int len)
4082 {
4083         struct bitstream bs;
4084         u64 look_ahead;
4085         u64 rl;
4086         u64 tmp;
4087         unsigned long s = c->bit_offset;
4088         unsigned long e;
4089         int toggle = dcbp_get_start(p);
4090         int have;
4091         int bits;
4092
4093         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4094
4095         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4096         if (bits < 0)
4097                 return -EIO;
4098
4099         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4100                 bits = vli_decode_bits(&rl, look_ahead);
4101                 if (bits <= 0)
4102                         return -EIO;
4103
4104                 if (toggle) {
4105                         e = s + rl -1;
4106                         if (e >= c->bm_bits) {
4107                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4108                                 return -EIO;
4109                         }
4110                         _drbd_bm_set_bits(mdev, s, e);
4111                 }
4112
4113                 if (have < bits) {
4114                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4115                                 have, bits, look_ahead,
4116                                 (unsigned int)(bs.cur.b - p->code),
4117                                 (unsigned int)bs.buf_len);
4118                         return -EIO;
4119                 }
4120                 look_ahead >>= bits;
4121                 have -= bits;
4122
4123                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4124                 if (bits < 0)
4125                         return -EIO;
4126                 look_ahead |= tmp << have;
4127                 have += bits;
4128         }
4129
4130         c->bit_offset = s;
4131         bm_xfer_ctx_bit_to_word_offset(c);
4132
4133         return (s != c->bm_bits);
4134 }
4135
4136 /**
4137  * decode_bitmap_c
4138  *
4139  * Return 0 when done, 1 when another iteration is needed, and a negative error
4140  * code upon failure.
4141  */
4142 static int
4143 decode_bitmap_c(struct drbd_conf *mdev,
4144                 struct p_compressed_bm *p,
4145                 struct bm_xfer_ctx *c,
4146                 unsigned int len)
4147 {
4148         if (dcbp_get_code(p) == RLE_VLI_Bits)
4149                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4150
4151         /* other variants had been implemented for evaluation,
4152          * but have been dropped as this one turned out to be "best"
4153          * during all our tests. */
4154
4155         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4156         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4157         return -EIO;
4158 }
4159
4160 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4161                 const char *direction, struct bm_xfer_ctx *c)
4162 {
4163         /* what would it take to transfer it "plaintext" */
4164         unsigned int header_size = drbd_header_size(mdev->tconn);
4165         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4166         unsigned int plain =
4167                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4168                 c->bm_words * sizeof(unsigned long);
4169         unsigned int total = c->bytes[0] + c->bytes[1];
4170         unsigned int r;
4171
4172         /* total can not be zero. but just in case: */
4173         if (total == 0)
4174                 return;
4175
4176         /* don't report if not compressed */
4177         if (total >= plain)
4178                 return;
4179
4180         /* total < plain. check for overflow, still */
4181         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4182                                     : (1000 * total / plain);
4183
4184         if (r > 1000)
4185                 r = 1000;
4186
4187         r = 1000 - r;
4188         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4189              "total %u; compression: %u.%u%%\n",
4190                         direction,
4191                         c->bytes[1], c->packets[1],
4192                         c->bytes[0], c->packets[0],
4193                         total, r/10, r % 10);
4194 }
4195
4196 /* Since we are processing the bitfield from lower addresses to higher,
4197    it does not matter if the process it in 32 bit chunks or 64 bit
4198    chunks as long as it is little endian. (Understand it as byte stream,
4199    beginning with the lowest byte...) If we would use big endian
4200    we would need to process it from the highest address to the lowest,
4201    in order to be agnostic to the 32 vs 64 bits issue.
4202
4203    returns 0 on failure, 1 if we successfully received it. */
4204 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4205 {
4206         struct drbd_conf *mdev;
4207         struct bm_xfer_ctx c;
4208         int err;
4209
4210         mdev = vnr_to_mdev(tconn, pi->vnr);
4211         if (!mdev)
4212                 return -EIO;
4213
4214         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4215         /* you are supposed to send additional out-of-sync information
4216          * if you actually set bits during this phase */
4217
4218         c = (struct bm_xfer_ctx) {
4219                 .bm_bits = drbd_bm_bits(mdev),
4220                 .bm_words = drbd_bm_words(mdev),
4221         };
4222
4223         for(;;) {
4224                 if (pi->cmd == P_BITMAP)
4225                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4226                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4227                         /* MAYBE: sanity check that we speak proto >= 90,
4228                          * and the feature is enabled! */
4229                         struct p_compressed_bm *p = pi->data;
4230
4231                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4232                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4233                                 err = -EIO;
4234                                 goto out;
4235                         }
4236                         if (pi->size <= sizeof(*p)) {
4237                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4238                                 err = -EIO;
4239                                 goto out;
4240                         }
4241                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4242                         if (err)
4243                                goto out;
4244                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4245                 } else {
4246                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4247                         err = -EIO;
4248                         goto out;
4249                 }
4250
4251                 c.packets[pi->cmd == P_BITMAP]++;
4252                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4253
4254                 if (err <= 0) {
4255                         if (err < 0)
4256                                 goto out;
4257                         break;
4258                 }
4259                 err = drbd_recv_header(mdev->tconn, pi);
4260                 if (err)
4261                         goto out;
4262         }
4263
4264         INFO_bm_xfer_stats(mdev, "receive", &c);
4265
4266         if (mdev->state.conn == C_WF_BITMAP_T) {
4267                 enum drbd_state_rv rv;
4268
4269                 err = drbd_send_bitmap(mdev);
4270                 if (err)
4271                         goto out;
4272                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4273                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4274                 D_ASSERT(rv == SS_SUCCESS);
4275         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4276                 /* admin may have requested C_DISCONNECTING,
4277                  * other threads may have noticed network errors */
4278                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4279                     drbd_conn_str(mdev->state.conn));
4280         }
4281         err = 0;
4282
4283  out:
4284         drbd_bm_unlock(mdev);
4285         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4286                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4287         return err;
4288 }
4289
4290 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4291 {
4292         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4293                  pi->cmd, pi->size);
4294
4295         return ignore_remaining_packet(tconn, pi);
4296 }
4297
4298 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4299 {
4300         /* Make sure we've acked all the TCP data associated
4301          * with the data requests being unplugged */
4302         drbd_tcp_quickack(tconn->data.socket);
4303
4304         return 0;
4305 }
4306
4307 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4308 {
4309         struct drbd_conf *mdev;
4310         struct p_block_desc *p = pi->data;
4311
4312         mdev = vnr_to_mdev(tconn, pi->vnr);
4313         if (!mdev)
4314                 return -EIO;
4315
4316         switch (mdev->state.conn) {
4317         case C_WF_SYNC_UUID:
4318         case C_WF_BITMAP_T:
4319         case C_BEHIND:
4320                         break;
4321         default:
4322                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4323                                 drbd_conn_str(mdev->state.conn));
4324         }
4325
4326         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4327
4328         return 0;
4329 }
4330
4331 struct data_cmd {
4332         int expect_payload;
4333         size_t pkt_size;
4334         int (*fn)(struct drbd_tconn *, struct packet_info *);
4335 };
4336
4337 static struct data_cmd drbd_cmd_handler[] = {
4338         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4339         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4340         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4341         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4342         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4343         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4344         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4345         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4346         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4347         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4348         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4349         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4350         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4351         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4352         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4353         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4354         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4355         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4356         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4357         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4358         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4359         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4360         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4361         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4362 };
4363
4364 static void drbdd(struct drbd_tconn *tconn)
4365 {
4366         struct packet_info pi;
4367         size_t shs; /* sub header size */
4368         int err;
4369
4370         while (get_t_state(&tconn->receiver) == RUNNING) {
4371                 struct data_cmd *cmd;
4372
4373                 drbd_thread_current_set_cpu(&tconn->receiver);
4374                 if (drbd_recv_header(tconn, &pi))
4375                         goto err_out;
4376
4377                 cmd = &drbd_cmd_handler[pi.cmd];
4378                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4379                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4380                                  cmdname(pi.cmd), pi.cmd);
4381                         goto err_out;
4382                 }
4383
4384                 shs = cmd->pkt_size;
4385                 if (pi.size > shs && !cmd->expect_payload) {
4386                         conn_err(tconn, "No payload expected %s l:%d\n",
4387                                  cmdname(pi.cmd), pi.size);
4388                         goto err_out;
4389                 }
4390
4391                 if (shs) {
4392                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4393                         if (err)
4394                                 goto err_out;
4395                         pi.size -= shs;
4396                 }
4397
4398                 err = cmd->fn(tconn, &pi);
4399                 if (err) {
4400                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4401                                  cmdname(pi.cmd), err, pi.size);
4402                         goto err_out;
4403                 }
4404         }
4405         return;
4406
4407     err_out:
4408         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4409 }
4410
4411 void conn_flush_workqueue(struct drbd_tconn *tconn)
4412 {
4413         struct drbd_wq_barrier barr;
4414
4415         barr.w.cb = w_prev_work_done;
4416         barr.w.tconn = tconn;
4417         init_completion(&barr.done);
4418         drbd_queue_work(&tconn->sender_work, &barr.w);
4419         wait_for_completion(&barr.done);
4420 }
4421
4422 static void conn_disconnect(struct drbd_tconn *tconn)
4423 {
4424         struct drbd_conf *mdev;
4425         enum drbd_conns oc;
4426         int vnr;
4427
4428         if (tconn->cstate == C_STANDALONE)
4429                 return;
4430
4431         /* We are about to start the cleanup after connection loss.
4432          * Make sure drbd_make_request knows about that.
4433          * Usually we should be in some network failure state already,
4434          * but just in case we are not, we fix it up here.
4435          */
4436         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4437
4438         /* asender does not clean up anything. it must not interfere, either */
4439         drbd_thread_stop(&tconn->asender);
4440         drbd_free_sock(tconn);
4441
4442         rcu_read_lock();
4443         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4444                 kref_get(&mdev->kref);
4445                 rcu_read_unlock();
4446                 drbd_disconnected(mdev);
4447                 kref_put(&mdev->kref, &drbd_minor_destroy);
4448                 rcu_read_lock();
4449         }
4450         rcu_read_unlock();
4451
4452         if (!list_empty(&tconn->current_epoch->list))
4453                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4454         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4455         atomic_set(&tconn->current_epoch->epoch_size, 0);
4456         tconn->send.seen_any_write_yet = false;
4457
4458         conn_info(tconn, "Connection closed\n");
4459
4460         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4461                 conn_try_outdate_peer_async(tconn);
4462
4463         spin_lock_irq(&tconn->req_lock);
4464         oc = tconn->cstate;
4465         if (oc >= C_UNCONNECTED)
4466                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4467
4468         spin_unlock_irq(&tconn->req_lock);
4469
4470         if (oc == C_DISCONNECTING)
4471                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4472 }
4473
4474 static int drbd_disconnected(struct drbd_conf *mdev)
4475 {
4476         unsigned int i;
4477
4478         /* wait for current activity to cease. */
4479         spin_lock_irq(&mdev->tconn->req_lock);
4480         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4481         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4482         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4483         spin_unlock_irq(&mdev->tconn->req_lock);
4484
4485         /* We do not have data structures that would allow us to
4486          * get the rs_pending_cnt down to 0 again.
4487          *  * On C_SYNC_TARGET we do not have any data structures describing
4488          *    the pending RSDataRequest's we have sent.
4489          *  * On C_SYNC_SOURCE there is no data structure that tracks
4490          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4491          *  And no, it is not the sum of the reference counts in the
4492          *  resync_LRU. The resync_LRU tracks the whole operation including
4493          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4494          *  on the fly. */
4495         drbd_rs_cancel_all(mdev);
4496         mdev->rs_total = 0;
4497         mdev->rs_failed = 0;
4498         atomic_set(&mdev->rs_pending_cnt, 0);
4499         wake_up(&mdev->misc_wait);
4500
4501         del_timer_sync(&mdev->resync_timer);
4502         resync_timer_fn((unsigned long)mdev);
4503
4504         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4505          * w_make_resync_request etc. which may still be on the worker queue
4506          * to be "canceled" */
4507         drbd_flush_workqueue(mdev);
4508
4509         drbd_finish_peer_reqs(mdev);
4510
4511         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4512            might have issued a work again. The one before drbd_finish_peer_reqs() is
4513            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4514         drbd_flush_workqueue(mdev);
4515
4516         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4517          * again via drbd_try_clear_on_disk_bm(). */
4518         drbd_rs_cancel_all(mdev);
4519
4520         kfree(mdev->p_uuid);
4521         mdev->p_uuid = NULL;
4522
4523         if (!drbd_suspended(mdev))
4524                 tl_clear(mdev->tconn);
4525
4526         drbd_md_sync(mdev);
4527
4528         /* serialize with bitmap writeout triggered by the state change,
4529          * if any. */
4530         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4531
4532         /* tcp_close and release of sendpage pages can be deferred.  I don't
4533          * want to use SO_LINGER, because apparently it can be deferred for
4534          * more than 20 seconds (longest time I checked).
4535          *
4536          * Actually we don't care for exactly when the network stack does its
4537          * put_page(), but release our reference on these pages right here.
4538          */
4539         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4540         if (i)
4541                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4542         i = atomic_read(&mdev->pp_in_use_by_net);
4543         if (i)
4544                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4545         i = atomic_read(&mdev->pp_in_use);
4546         if (i)
4547                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4548
4549         D_ASSERT(list_empty(&mdev->read_ee));
4550         D_ASSERT(list_empty(&mdev->active_ee));
4551         D_ASSERT(list_empty(&mdev->sync_ee));
4552         D_ASSERT(list_empty(&mdev->done_ee));
4553
4554         return 0;
4555 }
4556
4557 /*
4558  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4559  * we can agree on is stored in agreed_pro_version.
4560  *
4561  * feature flags and the reserved array should be enough room for future
4562  * enhancements of the handshake protocol, and possible plugins...
4563  *
4564  * for now, they are expected to be zero, but ignored.
4565  */
4566 static int drbd_send_features(struct drbd_tconn *tconn)
4567 {
4568         struct drbd_socket *sock;
4569         struct p_connection_features *p;
4570
4571         sock = &tconn->data;
4572         p = conn_prepare_command(tconn, sock);
4573         if (!p)
4574                 return -EIO;
4575         memset(p, 0, sizeof(*p));
4576         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4577         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4578         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4579 }
4580
4581 /*
4582  * return values:
4583  *   1 yes, we have a valid connection
4584  *   0 oops, did not work out, please try again
4585  *  -1 peer talks different language,
4586  *     no point in trying again, please go standalone.
4587  */
4588 static int drbd_do_features(struct drbd_tconn *tconn)
4589 {
4590         /* ASSERT current == tconn->receiver ... */
4591         struct p_connection_features *p;
4592         const int expect = sizeof(struct p_connection_features);
4593         struct packet_info pi;
4594         int err;
4595
4596         err = drbd_send_features(tconn);
4597         if (err)
4598                 return 0;
4599
4600         err = drbd_recv_header(tconn, &pi);
4601         if (err)
4602                 return 0;
4603
4604         if (pi.cmd != P_CONNECTION_FEATURES) {
4605                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4606                          cmdname(pi.cmd), pi.cmd);
4607                 return -1;
4608         }
4609
4610         if (pi.size != expect) {
4611                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4612                      expect, pi.size);
4613                 return -1;
4614         }
4615
4616         p = pi.data;
4617         err = drbd_recv_all_warn(tconn, p, expect);
4618         if (err)
4619                 return 0;
4620
4621         p->protocol_min = be32_to_cpu(p->protocol_min);
4622         p->protocol_max = be32_to_cpu(p->protocol_max);
4623         if (p->protocol_max == 0)
4624                 p->protocol_max = p->protocol_min;
4625
4626         if (PRO_VERSION_MAX < p->protocol_min ||
4627             PRO_VERSION_MIN > p->protocol_max)
4628                 goto incompat;
4629
4630         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4631
4632         conn_info(tconn, "Handshake successful: "
4633              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4634
4635         return 1;
4636
4637  incompat:
4638         conn_err(tconn, "incompatible DRBD dialects: "
4639             "I support %d-%d, peer supports %d-%d\n",
4640             PRO_VERSION_MIN, PRO_VERSION_MAX,
4641             p->protocol_min, p->protocol_max);
4642         return -1;
4643 }
4644
4645 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4646 static int drbd_do_auth(struct drbd_tconn *tconn)
4647 {
4648         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4649         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4650         return -1;
4651 }
4652 #else
4653 #define CHALLENGE_LEN 64
4654
4655 /* Return value:
4656         1 - auth succeeded,
4657         0 - failed, try again (network error),
4658         -1 - auth failed, don't try again.
4659 */
4660
4661 static int drbd_do_auth(struct drbd_tconn *tconn)
4662 {
4663         struct drbd_socket *sock;
4664         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4665         struct scatterlist sg;
4666         char *response = NULL;
4667         char *right_response = NULL;
4668         char *peers_ch = NULL;
4669         unsigned int key_len;
4670         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4671         unsigned int resp_size;
4672         struct hash_desc desc;
4673         struct packet_info pi;
4674         struct net_conf *nc;
4675         int err, rv;
4676
4677         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4678
4679         rcu_read_lock();
4680         nc = rcu_dereference(tconn->net_conf);
4681         key_len = strlen(nc->shared_secret);
4682         memcpy(secret, nc->shared_secret, key_len);
4683         rcu_read_unlock();
4684
4685         desc.tfm = tconn->cram_hmac_tfm;
4686         desc.flags = 0;
4687
4688         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4689         if (rv) {
4690                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4691                 rv = -1;
4692                 goto fail;
4693         }
4694
4695         get_random_bytes(my_challenge, CHALLENGE_LEN);
4696
4697         sock = &tconn->data;
4698         if (!conn_prepare_command(tconn, sock)) {
4699                 rv = 0;
4700                 goto fail;
4701         }
4702         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4703                                 my_challenge, CHALLENGE_LEN);
4704         if (!rv)
4705                 goto fail;
4706
4707         err = drbd_recv_header(tconn, &pi);
4708         if (err) {
4709                 rv = 0;
4710                 goto fail;
4711         }
4712
4713         if (pi.cmd != P_AUTH_CHALLENGE) {
4714                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4715                          cmdname(pi.cmd), pi.cmd);
4716                 rv = 0;
4717                 goto fail;
4718         }
4719
4720         if (pi.size > CHALLENGE_LEN * 2) {
4721                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4722                 rv = -1;
4723                 goto fail;
4724         }
4725
4726         peers_ch = kmalloc(pi.size, GFP_NOIO);
4727         if (peers_ch == NULL) {
4728                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4729                 rv = -1;
4730                 goto fail;
4731         }
4732
4733         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4734         if (err) {
4735                 rv = 0;
4736                 goto fail;
4737         }
4738
4739         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4740         response = kmalloc(resp_size, GFP_NOIO);
4741         if (response == NULL) {
4742                 conn_err(tconn, "kmalloc of response failed\n");
4743                 rv = -1;
4744                 goto fail;
4745         }
4746
4747         sg_init_table(&sg, 1);
4748         sg_set_buf(&sg, peers_ch, pi.size);
4749
4750         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4751         if (rv) {
4752                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4753                 rv = -1;
4754                 goto fail;
4755         }
4756
4757         if (!conn_prepare_command(tconn, sock)) {
4758                 rv = 0;
4759                 goto fail;
4760         }
4761         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4762                                 response, resp_size);
4763         if (!rv)
4764                 goto fail;
4765
4766         err = drbd_recv_header(tconn, &pi);
4767         if (err) {
4768                 rv = 0;
4769                 goto fail;
4770         }
4771
4772         if (pi.cmd != P_AUTH_RESPONSE) {
4773                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4774                          cmdname(pi.cmd), pi.cmd);
4775                 rv = 0;
4776                 goto fail;
4777         }
4778
4779         if (pi.size != resp_size) {
4780                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4781                 rv = 0;
4782                 goto fail;
4783         }
4784
4785         err = drbd_recv_all_warn(tconn, response , resp_size);
4786         if (err) {
4787                 rv = 0;
4788                 goto fail;
4789         }
4790
4791         right_response = kmalloc(resp_size, GFP_NOIO);
4792         if (right_response == NULL) {
4793                 conn_err(tconn, "kmalloc of right_response failed\n");
4794                 rv = -1;
4795                 goto fail;
4796         }
4797
4798         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4799
4800         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4801         if (rv) {
4802                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4803                 rv = -1;
4804                 goto fail;
4805         }
4806
4807         rv = !memcmp(response, right_response, resp_size);
4808
4809         if (rv)
4810                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4811                      resp_size);
4812         else
4813                 rv = -1;
4814
4815  fail:
4816         kfree(peers_ch);
4817         kfree(response);
4818         kfree(right_response);
4819
4820         return rv;
4821 }
4822 #endif
4823
4824 int drbdd_init(struct drbd_thread *thi)
4825 {
4826         struct drbd_tconn *tconn = thi->tconn;
4827         int h;
4828
4829         conn_info(tconn, "receiver (re)started\n");
4830
4831         do {
4832                 h = conn_connect(tconn);
4833                 if (h == 0) {
4834                         conn_disconnect(tconn);
4835                         schedule_timeout_interruptible(HZ);
4836                 }
4837                 if (h == -1) {
4838                         conn_warn(tconn, "Discarding network configuration.\n");
4839                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4840                 }
4841         } while (h == 0);
4842
4843         if (h > 0)
4844                 drbdd(tconn);
4845
4846         conn_disconnect(tconn);
4847
4848         conn_info(tconn, "receiver terminated\n");
4849         return 0;
4850 }
4851
4852 /* ********* acknowledge sender ******** */
4853
4854 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4855 {
4856         struct p_req_state_reply *p = pi->data;
4857         int retcode = be32_to_cpu(p->retcode);
4858
4859         if (retcode >= SS_SUCCESS) {
4860                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4861         } else {
4862                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4863                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4864                          drbd_set_st_err_str(retcode), retcode);
4865         }
4866         wake_up(&tconn->ping_wait);
4867
4868         return 0;
4869 }
4870
4871 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4872 {
4873         struct drbd_conf *mdev;
4874         struct p_req_state_reply *p = pi->data;
4875         int retcode = be32_to_cpu(p->retcode);
4876
4877         mdev = vnr_to_mdev(tconn, pi->vnr);
4878         if (!mdev)
4879                 return -EIO;
4880
4881         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4882                 D_ASSERT(tconn->agreed_pro_version < 100);
4883                 return got_conn_RqSReply(tconn, pi);
4884         }
4885
4886         if (retcode >= SS_SUCCESS) {
4887                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4888         } else {
4889                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4890                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4891                         drbd_set_st_err_str(retcode), retcode);
4892         }
4893         wake_up(&mdev->state_wait);
4894
4895         return 0;
4896 }
4897
4898 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4899 {
4900         return drbd_send_ping_ack(tconn);
4901
4902 }
4903
4904 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4905 {
4906         /* restore idle timeout */
4907         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4908         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4909                 wake_up(&tconn->ping_wait);
4910
4911         return 0;
4912 }
4913
4914 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4915 {
4916         struct drbd_conf *mdev;
4917         struct p_block_ack *p = pi->data;
4918         sector_t sector = be64_to_cpu(p->sector);
4919         int blksize = be32_to_cpu(p->blksize);
4920
4921         mdev = vnr_to_mdev(tconn, pi->vnr);
4922         if (!mdev)
4923                 return -EIO;
4924
4925         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4926
4927         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4928
4929         if (get_ldev(mdev)) {
4930                 drbd_rs_complete_io(mdev, sector);
4931                 drbd_set_in_sync(mdev, sector, blksize);
4932                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4933                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4934                 put_ldev(mdev);
4935         }
4936         dec_rs_pending(mdev);
4937         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4938
4939         return 0;
4940 }
4941
4942 static int
4943 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4944                               struct rb_root *root, const char *func,
4945                               enum drbd_req_event what, bool missing_ok)
4946 {
4947         struct drbd_request *req;
4948         struct bio_and_error m;
4949
4950         spin_lock_irq(&mdev->tconn->req_lock);
4951         req = find_request(mdev, root, id, sector, missing_ok, func);
4952         if (unlikely(!req)) {
4953                 spin_unlock_irq(&mdev->tconn->req_lock);
4954                 return -EIO;
4955         }
4956         __req_mod(req, what, &m);
4957         spin_unlock_irq(&mdev->tconn->req_lock);
4958
4959         if (m.bio)
4960                 complete_master_bio(mdev, &m);
4961         return 0;
4962 }
4963
4964 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4965 {
4966         struct drbd_conf *mdev;
4967         struct p_block_ack *p = pi->data;
4968         sector_t sector = be64_to_cpu(p->sector);
4969         int blksize = be32_to_cpu(p->blksize);
4970         enum drbd_req_event what;
4971
4972         mdev = vnr_to_mdev(tconn, pi->vnr);
4973         if (!mdev)
4974                 return -EIO;
4975
4976         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4977
4978         if (p->block_id == ID_SYNCER) {
4979                 drbd_set_in_sync(mdev, sector, blksize);
4980                 dec_rs_pending(mdev);
4981                 return 0;
4982         }
4983         switch (pi->cmd) {
4984         case P_RS_WRITE_ACK:
4985                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4986                 break;
4987         case P_WRITE_ACK:
4988                 what = WRITE_ACKED_BY_PEER;
4989                 break;
4990         case P_RECV_ACK:
4991                 what = RECV_ACKED_BY_PEER;
4992                 break;
4993         case P_SUPERSEDED:
4994                 what = CONFLICT_RESOLVED;
4995                 break;
4996         case P_RETRY_WRITE:
4997                 what = POSTPONE_WRITE;
4998                 break;
4999         default:
5000                 BUG();
5001         }
5002
5003         return validate_req_change_req_state(mdev, p->block_id, sector,
5004                                              &mdev->write_requests, __func__,
5005                                              what, false);
5006 }
5007
5008 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5009 {
5010         struct drbd_conf *mdev;
5011         struct p_block_ack *p = pi->data;
5012         sector_t sector = be64_to_cpu(p->sector);
5013         int size = be32_to_cpu(p->blksize);
5014         int err;
5015
5016         mdev = vnr_to_mdev(tconn, pi->vnr);
5017         if (!mdev)
5018                 return -EIO;
5019
5020         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5021
5022         if (p->block_id == ID_SYNCER) {
5023                 dec_rs_pending(mdev);
5024                 drbd_rs_failed_io(mdev, sector, size);
5025                 return 0;
5026         }
5027
5028         err = validate_req_change_req_state(mdev, p->block_id, sector,
5029                                             &mdev->write_requests, __func__,
5030                                             NEG_ACKED, true);
5031         if (err) {
5032                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5033                    The master bio might already be completed, therefore the
5034                    request is no longer in the collision hash. */
5035                 /* In Protocol B we might already have got a P_RECV_ACK
5036                    but then get a P_NEG_ACK afterwards. */
5037                 drbd_set_out_of_sync(mdev, sector, size);
5038         }
5039         return 0;
5040 }
5041
5042 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5043 {
5044         struct drbd_conf *mdev;
5045         struct p_block_ack *p = pi->data;
5046         sector_t sector = be64_to_cpu(p->sector);
5047
5048         mdev = vnr_to_mdev(tconn, pi->vnr);
5049         if (!mdev)
5050                 return -EIO;
5051
5052         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5053
5054         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5055             (unsigned long long)sector, be32_to_cpu(p->blksize));
5056
5057         return validate_req_change_req_state(mdev, p->block_id, sector,
5058                                              &mdev->read_requests, __func__,
5059                                              NEG_ACKED, false);
5060 }
5061
5062 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5063 {
5064         struct drbd_conf *mdev;
5065         sector_t sector;
5066         int size;
5067         struct p_block_ack *p = pi->data;
5068
5069         mdev = vnr_to_mdev(tconn, pi->vnr);
5070         if (!mdev)
5071                 return -EIO;
5072
5073         sector = be64_to_cpu(p->sector);
5074         size = be32_to_cpu(p->blksize);
5075
5076         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5077
5078         dec_rs_pending(mdev);
5079
5080         if (get_ldev_if_state(mdev, D_FAILED)) {
5081                 drbd_rs_complete_io(mdev, sector);
5082                 switch (pi->cmd) {
5083                 case P_NEG_RS_DREPLY:
5084                         drbd_rs_failed_io(mdev, sector, size);
5085                 case P_RS_CANCEL:
5086                         break;
5087                 default:
5088                         BUG();
5089                 }
5090                 put_ldev(mdev);
5091         }
5092
5093         return 0;
5094 }
5095
5096 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5097 {
5098         struct p_barrier_ack *p = pi->data;
5099         struct drbd_conf *mdev;
5100         int vnr;
5101
5102         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5103
5104         rcu_read_lock();
5105         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5106                 if (mdev->state.conn == C_AHEAD &&
5107                     atomic_read(&mdev->ap_in_flight) == 0 &&
5108                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5109                         mdev->start_resync_timer.expires = jiffies + HZ;
5110                         add_timer(&mdev->start_resync_timer);
5111                 }
5112         }
5113         rcu_read_unlock();
5114
5115         return 0;
5116 }
5117
5118 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5119 {
5120         struct drbd_conf *mdev;
5121         struct p_block_ack *p = pi->data;
5122         struct drbd_work *w;
5123         sector_t sector;
5124         int size;
5125
5126         mdev = vnr_to_mdev(tconn, pi->vnr);
5127         if (!mdev)
5128                 return -EIO;
5129
5130         sector = be64_to_cpu(p->sector);
5131         size = be32_to_cpu(p->blksize);
5132
5133         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5134
5135         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5136                 drbd_ov_out_of_sync_found(mdev, sector, size);
5137         else
5138                 ov_out_of_sync_print(mdev);
5139
5140         if (!get_ldev(mdev))
5141                 return 0;
5142
5143         drbd_rs_complete_io(mdev, sector);
5144         dec_rs_pending(mdev);
5145
5146         --mdev->ov_left;
5147
5148         /* let's advance progress step marks only for every other megabyte */
5149         if ((mdev->ov_left & 0x200) == 0x200)
5150                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5151
5152         if (mdev->ov_left == 0) {
5153                 w = kmalloc(sizeof(*w), GFP_NOIO);
5154                 if (w) {
5155                         w->cb = w_ov_finished;
5156                         w->mdev = mdev;
5157                         drbd_queue_work(&mdev->tconn->sender_work, w);
5158                 } else {
5159                         dev_err(DEV, "kmalloc(w) failed.");
5160                         ov_out_of_sync_print(mdev);
5161                         drbd_resync_finished(mdev);
5162                 }
5163         }
5164         put_ldev(mdev);
5165         return 0;
5166 }
5167
5168 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5169 {
5170         return 0;
5171 }
5172
5173 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5174 {
5175         struct drbd_conf *mdev;
5176         int vnr, not_empty = 0;
5177
5178         do {
5179                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5180                 flush_signals(current);
5181
5182                 rcu_read_lock();
5183                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5184                         kref_get(&mdev->kref);
5185                         rcu_read_unlock();
5186                         if (drbd_finish_peer_reqs(mdev)) {
5187                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5188                                 return 1;
5189                         }
5190                         kref_put(&mdev->kref, &drbd_minor_destroy);
5191                         rcu_read_lock();
5192                 }
5193                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5194
5195                 spin_lock_irq(&tconn->req_lock);
5196                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5197                         not_empty = !list_empty(&mdev->done_ee);
5198                         if (not_empty)
5199                                 break;
5200                 }
5201                 spin_unlock_irq(&tconn->req_lock);
5202                 rcu_read_unlock();
5203         } while (not_empty);
5204
5205         return 0;
5206 }
5207
5208 struct asender_cmd {
5209         size_t pkt_size;
5210         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5211 };
5212
5213 static struct asender_cmd asender_tbl[] = {
5214         [P_PING]            = { 0, got_Ping },
5215         [P_PING_ACK]        = { 0, got_PingAck },
5216         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5217         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5218         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5219         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5220         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5221         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5222         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5223         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5224         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5225         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5226         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5227         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5228         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5229         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5230         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5231 };
5232
5233 int drbd_asender(struct drbd_thread *thi)
5234 {
5235         struct drbd_tconn *tconn = thi->tconn;
5236         struct asender_cmd *cmd = NULL;
5237         struct packet_info pi;
5238         int rv;
5239         void *buf    = tconn->meta.rbuf;
5240         int received = 0;
5241         unsigned int header_size = drbd_header_size(tconn);
5242         int expect   = header_size;
5243         bool ping_timeout_active = false;
5244         struct net_conf *nc;
5245         int ping_timeo, tcp_cork, ping_int;
5246
5247         current->policy = SCHED_RR;  /* Make this a realtime task! */
5248         current->rt_priority = 2;    /* more important than all other tasks */
5249
5250         while (get_t_state(thi) == RUNNING) {
5251                 drbd_thread_current_set_cpu(thi);
5252
5253                 rcu_read_lock();
5254                 nc = rcu_dereference(tconn->net_conf);
5255                 ping_timeo = nc->ping_timeo;
5256                 tcp_cork = nc->tcp_cork;
5257                 ping_int = nc->ping_int;
5258                 rcu_read_unlock();
5259
5260                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5261                         if (drbd_send_ping(tconn)) {
5262                                 conn_err(tconn, "drbd_send_ping has failed\n");
5263                                 goto reconnect;
5264                         }
5265                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5266                         ping_timeout_active = true;
5267                 }
5268
5269                 /* TODO: conditionally cork; it may hurt latency if we cork without
5270                    much to send */
5271                 if (tcp_cork)
5272                         drbd_tcp_cork(tconn->meta.socket);
5273                 if (tconn_finish_peer_reqs(tconn)) {
5274                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5275                         goto reconnect;
5276                 }
5277                 /* but unconditionally uncork unless disabled */
5278                 if (tcp_cork)
5279                         drbd_tcp_uncork(tconn->meta.socket);
5280
5281                 /* short circuit, recv_msg would return EINTR anyways. */
5282                 if (signal_pending(current))
5283                         continue;
5284
5285                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5286                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5287
5288                 flush_signals(current);
5289
5290                 /* Note:
5291                  * -EINTR        (on meta) we got a signal
5292                  * -EAGAIN       (on meta) rcvtimeo expired
5293                  * -ECONNRESET   other side closed the connection
5294                  * -ERESTARTSYS  (on data) we got a signal
5295                  * rv <  0       other than above: unexpected error!
5296                  * rv == expected: full header or command
5297                  * rv <  expected: "woken" by signal during receive
5298                  * rv == 0       : "connection shut down by peer"
5299                  */
5300                 if (likely(rv > 0)) {
5301                         received += rv;
5302                         buf      += rv;
5303                 } else if (rv == 0) {
5304                         if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
5305                                 long t;
5306                                 rcu_read_lock();
5307                                 t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
5308                                 rcu_read_unlock();
5309
5310                                 t = wait_event_timeout(tconn->ping_wait,
5311                                                        tconn->cstate < C_WF_REPORT_PARAMS,
5312                                                        t);
5313                                 if (t)
5314                                         break;
5315                         }
5316                         conn_err(tconn, "meta connection shut down by peer.\n");
5317                         goto reconnect;
5318                 } else if (rv == -EAGAIN) {
5319                         /* If the data socket received something meanwhile,
5320                          * that is good enough: peer is still alive. */
5321                         if (time_after(tconn->last_received,
5322                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5323                                 continue;
5324                         if (ping_timeout_active) {
5325                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5326                                 goto reconnect;
5327                         }
5328                         set_bit(SEND_PING, &tconn->flags);
5329                         continue;
5330                 } else if (rv == -EINTR) {
5331                         continue;
5332                 } else {
5333                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5334                         goto reconnect;
5335                 }
5336
5337                 if (received == expect && cmd == NULL) {
5338                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5339                                 goto reconnect;
5340                         cmd = &asender_tbl[pi.cmd];
5341                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5342                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5343                                          cmdname(pi.cmd), pi.cmd);
5344                                 goto disconnect;
5345                         }
5346                         expect = header_size + cmd->pkt_size;
5347                         if (pi.size != expect - header_size) {
5348                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5349                                         pi.cmd, pi.size);
5350                                 goto reconnect;
5351                         }
5352                 }
5353                 if (received == expect) {
5354                         bool err;
5355
5356                         err = cmd->fn(tconn, &pi);
5357                         if (err) {
5358                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5359                                 goto reconnect;
5360                         }
5361
5362                         tconn->last_received = jiffies;
5363
5364                         if (cmd == &asender_tbl[P_PING_ACK]) {
5365                                 /* restore idle timeout */
5366                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5367                                 ping_timeout_active = false;
5368                         }
5369
5370                         buf      = tconn->meta.rbuf;
5371                         received = 0;
5372                         expect   = header_size;
5373                         cmd      = NULL;
5374                 }
5375         }
5376
5377         if (0) {
5378 reconnect:
5379                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5380                 conn_md_sync(tconn);
5381         }
5382         if (0) {
5383 disconnect:
5384                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5385         }
5386         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5387
5388         conn_info(tconn, "asender terminated\n");
5389
5390         return 0;
5391 }