]> git.karo-electronics.de Git - mv-sheeva.git/blobdiff - drivers/block/drbd/drbd_receiver.c
Merge tag 'v2.6.37' of git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux-2.6
[mv-sheeva.git] / drivers / block / drbd / drbd_receiver.c
index 081522d3c7424046332d0eafc5baf76ece7acc8f..24487d4fb20297e6676a91e60c9525f26092e315 100644 (file)
@@ -36,7 +36,6 @@
 #include <linux/memcontrol.h>
 #include <linux/mm_inline.h>
 #include <linux/slab.h>
-#include <linux/smp_lock.h>
 #include <linux/pkt_sched.h>
 #define __KERNEL_SYSCALLS__
 #include <linux/unistd.h>
 
 #include "drbd_vli.h"
 
-struct flush_work {
-       struct drbd_work w;
-       struct drbd_epoch *epoch;
-};
-
 enum finish_epoch {
        FE_STILL_LIVE,
        FE_DESTROYED,
@@ -66,16 +60,6 @@ static int drbd_do_auth(struct drbd_conf *mdev);
 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
 
-static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
-{
-       struct drbd_epoch *prev;
-       spin_lock(&mdev->epoch_lock);
-       prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
-       if (prev == epoch || prev == mdev->current_epoch)
-               prev = NULL;
-       spin_unlock(&mdev->epoch_lock);
-       return prev;
-}
 
 #define GFP_TRY        (__GFP_HIGHMEM | __GFP_NOWARN)
 
@@ -241,7 +225,7 @@ static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &reclaimed, w.list)
-               drbd_free_ee(mdev, e);
+               drbd_free_net_ee(mdev, e);
 }
 
 /**
@@ -298,9 +282,11 @@ static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool
  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
  * Either links the page chain back to the global pool,
  * or returns all pages to the system. */
-static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
+static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
 {
+       atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
        int i;
+
        if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
                i = page_chain_free(page);
        else {
@@ -311,10 +297,10 @@ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
                drbd_pp_vacant += i;
                spin_unlock(&drbd_pp_lock);
        }
-       atomic_sub(i, &mdev->pp_in_use);
-       i = atomic_read(&mdev->pp_in_use);
+       i = atomic_sub_return(i, a);
        if (i < 0)
-               dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
+               dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
+                       is_net ? "pp_in_use_by_net" : "pp_in_use", i);
        wake_up(&drbd_pp_wait);
 }
 
@@ -365,7 +351,6 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
        e->size = data_size;
        e->flags = 0;
        e->sector = sector;
-       e->sector = sector;
        e->block_id = id;
 
        return e;
@@ -375,9 +360,11 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
        return NULL;
 }
 
-void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
+void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
 {
-       drbd_pp_free(mdev, e->pages);
+       if (e->flags & EE_HAS_DIGEST)
+               kfree(e->digest);
+       drbd_pp_free(mdev, e->pages, is_net);
        D_ASSERT(atomic_read(&e->pending_bios) == 0);
        D_ASSERT(hlist_unhashed(&e->colision));
        mempool_free(e, drbd_ee_mempool);
@@ -388,13 +375,14 @@ int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
        LIST_HEAD(work_list);
        struct drbd_epoch_entry *e, *t;
        int count = 0;
+       int is_net = list == &mdev->net_ee;
 
        spin_lock_irq(&mdev->req_lock);
        list_splice_init(list, &work_list);
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &work_list, w.list) {
-               drbd_free_ee(mdev, e);
+               drbd_free_some_ee(mdev, e, is_net);
                count++;
        }
        return count;
@@ -423,7 +411,7 @@ static int drbd_process_done_ee(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        list_for_each_entry_safe(e, t, &reclaimed, w.list)
-               drbd_free_ee(mdev, e);
+               drbd_free_net_ee(mdev, e);
 
        /* possible callbacks here:
         * e_end_block, and e_end_resync_block, e_send_discard_ack.
@@ -719,14 +707,14 @@ out:
 static int drbd_send_fp(struct drbd_conf *mdev,
        struct socket *sock, enum drbd_packets cmd)
 {
-       struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
+       struct p_header80 *h = &mdev->data.sbuf.header.h80;
 
        return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
 }
 
 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
 {
-       struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
+       struct p_header80 *h = &mdev->data.rbuf.header.h80;
        int rr;
 
        rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
@@ -776,9 +764,6 @@ static int drbd_connect(struct drbd_conf *mdev)
 
        D_ASSERT(!mdev->data.socket);
 
-       if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
-               dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
-
        if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
                return -2;
 
@@ -927,6 +912,11 @@ retry:
 
        drbd_thread_start(&mdev->asender);
 
+       if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
+               drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
+               put_ldev(mdev);
+       }
+
        if (!drbd_send_protocol(mdev))
                return -1;
        drbd_send_sync_param(mdev, &mdev->sync_conf);
@@ -946,22 +936,28 @@ out_release_sockets:
        return -1;
 }
 
-static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
+static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
 {
+       union p_header *h = &mdev->data.rbuf.header;
        int r;
 
        r = drbd_recv(mdev, h, sizeof(*h));
-
        if (unlikely(r != sizeof(*h))) {
                dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
                return FALSE;
-       };
-       h->command = be16_to_cpu(h->command);
-       h->length  = be16_to_cpu(h->length);
-       if (unlikely(h->magic != BE_DRBD_MAGIC)) {
-               dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
-                   (long)be32_to_cpu(h->magic),
-                   h->command, h->length);
+       }
+
+       if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
+               *cmd = be16_to_cpu(h->h80.command);
+               *packet_size = be16_to_cpu(h->h80.length);
+       } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
+               *cmd = be16_to_cpu(h->h95.command);
+               *packet_size = be32_to_cpu(h->h95.length);
+       } else {
+               dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
+                   be32_to_cpu(h->h80.magic),
+                   be16_to_cpu(h->h80.command),
+                   be16_to_cpu(h->h80.length));
                return FALSE;
        }
        mdev->last_received = jiffies;
@@ -969,13 +965,13 @@ static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
+static void drbd_flush(struct drbd_conf *mdev)
 {
        int rv;
 
        if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
                rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
-                                       NULL, BLKDEV_IFL_WAIT);
+                                       NULL);
                if (rv) {
                        dev_err(DEV, "local disk flush failed with status %d\n", rv);
                        /* would rather check on EOPNOTSUPP, but that is not reliable.
@@ -985,24 +981,6 @@ static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct d
                }
                put_ldev(mdev);
        }
-
-       return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
-}
-
-static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
-{
-       struct flush_work *fw = (struct flush_work *)w;
-       struct drbd_epoch *epoch = fw->epoch;
-
-       kfree(w);
-
-       if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
-               drbd_flush_after_epoch(mdev, epoch);
-
-       drbd_may_finish_epoch(mdev, epoch, EV_PUT |
-                             (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
-
-       return 1;
 }
 
 /**
@@ -1015,15 +993,13 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
                                               struct drbd_epoch *epoch,
                                               enum epoch_event ev)
 {
-       int finish, epoch_size;
+       int epoch_size;
        struct drbd_epoch *next_epoch;
-       int schedule_flush = 0;
        enum finish_epoch rv = FE_STILL_LIVE;
 
        spin_lock(&mdev->epoch_lock);
        do {
                next_epoch = NULL;
-               finish = 0;
 
                epoch_size = atomic_read(&epoch->epoch_size);
 
@@ -1033,16 +1009,6 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
                        break;
                case EV_GOT_BARRIER_NR:
                        set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
-
-                       /* Special case: If we just switched from WO_bio_barrier to
-                          WO_bdev_flush we should not finish the current epoch */
-                       if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
-                           mdev->write_ordering != WO_bio_barrier &&
-                           epoch == mdev->current_epoch)
-                               clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
-                       break;
-               case EV_BARRIER_DONE:
-                       set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
                        break;
                case EV_BECAME_LAST:
                        /* nothing to do*/
@@ -1051,23 +1017,7 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
 
                if (epoch_size != 0 &&
                    atomic_read(&epoch->active) == 0 &&
-                   test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
-                   epoch->list.prev == &mdev->current_epoch->list &&
-                   !test_bit(DE_IS_FINISHING, &epoch->flags)) {
-                       /* Nearly all conditions are met to finish that epoch... */
-                       if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
-                           mdev->write_ordering == WO_none ||
-                           (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
-                           ev & EV_CLEANUP) {
-                               finish = 1;
-                               set_bit(DE_IS_FINISHING, &epoch->flags);
-                       } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
-                                mdev->write_ordering == WO_bio_barrier) {
-                               atomic_inc(&epoch->active);
-                               schedule_flush = 1;
-                       }
-               }
-               if (finish) {
+                   test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
                        if (!(ev & EV_CLEANUP)) {
                                spin_unlock(&mdev->epoch_lock);
                                drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
@@ -1090,6 +1040,7 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
                                /* atomic_set(&epoch->active, 0); is already zero */
                                if (rv == FE_STILL_LIVE)
                                        rv = FE_RECYCLED;
+                               wake_up(&mdev->ee_wait);
                        }
                }
 
@@ -1101,22 +1052,6 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
 
        spin_unlock(&mdev->epoch_lock);
 
-       if (schedule_flush) {
-               struct flush_work *fw;
-               fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
-               if (fw) {
-                       fw->w.cb = w_flush;
-                       fw->epoch = epoch;
-                       drbd_queue_work(&mdev->data.work, &fw->w);
-               } else {
-                       dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
-                       set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
-                       /* That is not a recursion, only one level */
-                       drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
-                       drbd_may_finish_epoch(mdev, epoch, EV_PUT);
-               }
-       }
-
        return rv;
 }
 
@@ -1132,19 +1067,16 @@ void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo)
                [WO_none] = "none",
                [WO_drain_io] = "drain",
                [WO_bdev_flush] = "flush",
-               [WO_bio_barrier] = "barrier",
        };
 
        pwo = mdev->write_ordering;
        wo = min(pwo, wo);
-       if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
-               wo = WO_bdev_flush;
        if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
                wo = WO_drain_io;
        if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
                wo = WO_none;
        mdev->write_ordering = wo;
-       if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
+       if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
                dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
 }
 
@@ -1180,7 +1112,7 @@ next_bio:
        bio->bi_sector = sector;
        bio->bi_bdev = mdev->ldev->backing_bdev;
        /* we special case some flags in the multi-bio case, see below
-        * (REQ_UNPLUG, REQ_HARDBARRIER) */
+        * (REQ_UNPLUG) */
        bio->bi_rw = rw;
        bio->bi_private = e;
        bio->bi_end_io = drbd_endio_sec;
@@ -1214,11 +1146,6 @@ next_bio:
                        bio->bi_rw &= ~REQ_UNPLUG;
 
                drbd_generic_make_request(mdev, fault_type, bio);
-
-               /* strip off REQ_HARDBARRIER,
-                * unless it is the first or last bio */
-               if (bios && bios->bi_next)
-                       bios->bi_rw &= ~REQ_HARDBARRIER;
        } while (bios);
        maybe_kick_lo(mdev);
        return 0;
@@ -1232,53 +1159,12 @@ fail:
        return -ENOMEM;
 }
 
-/**
- * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
- * @mdev:      DRBD device.
- * @w:         work object.
- * @cancel:    The connection will be closed anyways (unused in this callback)
- */
-int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
-{
-       struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
-       /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
-          (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
-          so that we can finish that epoch in drbd_may_finish_epoch().
-          That is necessary if we already have a long chain of Epochs, before
-          we realize that REQ_HARDBARRIER is actually not supported */
-
-       /* As long as the -ENOTSUPP on the barrier is reported immediately
-          that will never trigger. If it is reported late, we will just
-          print that warning and continue correctly for all future requests
-          with WO_bdev_flush */
-       if (previous_epoch(mdev, e->epoch))
-               dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
-
-       /* we still have a local reference,
-        * get_ldev was done in receive_Data. */
-
-       e->w.cb = e_end_block;
-       if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
-               /* drbd_submit_ee fails for one reason only:
-                * if was not able to allocate sufficient bios.
-                * requeue, try again later. */
-               e->w.cb = w_e_reissue;
-               drbd_queue_work(&mdev->data.work, &e->w);
-       }
-       return 1;
-}
-
-static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
+static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       int rv, issue_flush;
-       struct p_barrier *p = (struct p_barrier *)h;
+       int rv;
+       struct p_barrier *p = &mdev->data.rbuf.barrier;
        struct drbd_epoch *epoch;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-
-       rv = drbd_recv(mdev, h->payload, h->length);
-       ERR_IF(rv != h->length) return FALSE;
-
        inc_unacked(mdev);
 
        if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
@@ -1293,44 +1179,40 @@ static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
         * Therefore we must send the barrier_ack after the barrier request was
         * completed. */
        switch (mdev->write_ordering) {
-       case WO_bio_barrier:
        case WO_none:
                if (rv == FE_RECYCLED)
                        return TRUE;
-               break;
+
+               /* receiver context, in the writeout path of the other node.
+                * avoid potential distributed deadlock */
+               epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
+               if (epoch)
+                       break;
+               else
+                       dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
+                       /* Fall through */
 
        case WO_bdev_flush:
        case WO_drain_io:
-               if (rv == FE_STILL_LIVE) {
-                       set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
-                       drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
-                       rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
-               }
-               if (rv == FE_RECYCLED)
-                       return TRUE;
-
-               /* The asender will send all the ACKs and barrier ACKs out, since
-                  all EEs moved from the active_ee to the done_ee. We need to
-                  provide a new epoch object for the EEs that come in soon */
-               break;
-       }
-
-       /* receiver context, in the writeout path of the other node.
-        * avoid potential distributed deadlock */
-       epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
-       if (!epoch) {
-               dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
-               issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
                drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
-               if (issue_flush) {
-                       rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
-                       if (rv == FE_RECYCLED)
-                               return TRUE;
+               drbd_flush(mdev);
+
+               if (atomic_read(&mdev->current_epoch->epoch_size)) {
+                       epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
+                       if (epoch)
+                               break;
                }
 
-               drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
+               epoch = mdev->current_epoch;
+               wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
+
+               D_ASSERT(atomic_read(&epoch->active) == 0);
+               D_ASSERT(epoch->flags == 0);
 
                return TRUE;
+       default:
+               dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
+               return FALSE;
        }
 
        epoch->flags = 0;
@@ -1457,7 +1339,7 @@ static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
                data_size -= rr;
        }
        kunmap(page);
-       drbd_pp_free(mdev, page);
+       drbd_pp_free(mdev, page, 0);
        return rv;
 }
 
@@ -1562,30 +1444,29 @@ static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_si
        list_add(&e->w.list, &mdev->sync_ee);
        spin_unlock_irq(&mdev->req_lock);
 
+       atomic_add(data_size >> 9, &mdev->rs_sect_ev);
        if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
                return TRUE;
 
+       /* drbd_submit_ee currently fails for one reason only:
+        * not being able to allocate enough bios.
+        * Is dropping the connection going to help? */
+       spin_lock_irq(&mdev->req_lock);
+       list_del(&e->w.list);
+       spin_unlock_irq(&mdev->req_lock);
+
        drbd_free_ee(mdev, e);
 fail:
        put_ldev(mdev);
        return FALSE;
 }
 
-static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
+static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        struct drbd_request *req;
        sector_t sector;
-       unsigned int header_size, data_size;
        int ok;
-       struct p_data *p = (struct p_data *)h;
-
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
+       struct p_data *p = &mdev->data.rbuf.data;
 
        sector = be64_to_cpu(p->sector);
 
@@ -1611,20 +1492,11 @@ static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
        return ok;
 }
 
-static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
+static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        sector_t sector;
-       unsigned int header_size, data_size;
        int ok;
-       struct p_data *p = (struct p_data *)h;
-
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
+       struct p_data *p = &mdev->data.rbuf.data;
 
        sector = be64_to_cpu(p->sector);
        D_ASSERT(p->block_id == ID_SYNCER);
@@ -1640,9 +1512,11 @@ static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
 
                ok = drbd_drain_block(mdev, data_size);
 
-               drbd_send_ack_dp(mdev, P_NEG_ACK, p);
+               drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
        }
 
+       atomic_add(data_size >> 9, &mdev->rs_sect_in);
+
        return ok;
 }
 
@@ -1653,15 +1527,8 @@ static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 {
        struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
        sector_t sector = e->sector;
-       struct drbd_epoch *epoch;
        int ok = 1, pcmd;
 
-       if (e->flags & EE_IS_BARRIER) {
-               epoch = previous_epoch(mdev, e->epoch);
-               if (epoch)
-                       drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
-       }
-
        if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
                if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                        pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
@@ -1765,24 +1632,27 @@ static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
        return ret;
 }
 
+static unsigned long write_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
+{
+       if (mdev->agreed_pro_version >= 95)
+               return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
+                       (dpf & DP_UNPLUG ? REQ_UNPLUG : 0) |
+                       (dpf & DP_FUA ? REQ_FUA : 0) |
+                       (dpf & DP_FLUSH ? REQ_FUA : 0) |
+                       (dpf & DP_DISCARD ? REQ_DISCARD : 0);
+       else
+               return dpf & DP_RW_SYNC ? (REQ_SYNC | REQ_UNPLUG) : 0;
+}
+
 /* mirrored write */
-static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
+static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        sector_t sector;
        struct drbd_epoch_entry *e;
-       struct p_data *p = (struct p_data *)h;
-       int header_size, data_size;
+       struct p_data *p = &mdev->data.rbuf.data;
        int rw = WRITE;
        u32 dp_flags;
 
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       ERR_IF(data_size == 0) return FALSE;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
-
        if (!get_ldev(mdev)) {
                if (__ratelimit(&drbd_ratelimit_state))
                        dev_err(DEV, "Can not write mirrored data block "
@@ -1792,7 +1662,7 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
                        mdev->peer_seq++;
                spin_unlock(&mdev->peer_seq_lock);
 
-               drbd_send_ack_dp(mdev, P_NEG_ACK, p);
+               drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
                atomic_inc(&mdev->current_epoch->epoch_size);
                return drbd_drain_block(mdev, data_size);
        }
@@ -1815,36 +1685,11 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
        e->epoch = mdev->current_epoch;
        atomic_inc(&e->epoch->epoch_size);
        atomic_inc(&e->epoch->active);
-
-       if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
-               struct drbd_epoch *epoch;
-               /* Issue a barrier if we start a new epoch, and the previous epoch
-                  was not a epoch containing a single request which already was
-                  a Barrier. */
-               epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
-               if (epoch == e->epoch) {
-                       set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
-                       rw |= REQ_HARDBARRIER;
-                       e->flags |= EE_IS_BARRIER;
-               } else {
-                       if (atomic_read(&epoch->epoch_size) > 1 ||
-                           !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
-                               set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
-                               set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
-                               rw |= REQ_HARDBARRIER;
-                               e->flags |= EE_IS_BARRIER;
-                       }
-               }
-       }
        spin_unlock(&mdev->epoch_lock);
 
        dp_flags = be32_to_cpu(p->dp_flags);
-       if (dp_flags & DP_HARDBARRIER) {
-               dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
-               /* rw |= REQ_HARDBARRIER; */
-       }
-       if (dp_flags & DP_RW_SYNC)
-               rw |= REQ_SYNC | REQ_UNPLUG;
+       rw |= write_flags_to_bio(mdev, dp_flags);
+
        if (dp_flags & DP_MAY_SET_IN_SYNC)
                e->flags |= EE_MAY_SET_IN_SYNC;
 
@@ -1997,16 +1842,27 @@ static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
                break;
        }
 
-       if (mdev->state.pdsk == D_DISKLESS) {
+       if (mdev->state.pdsk < D_INCONSISTENT) {
                /* In case we have the only disk of the cluster, */
                drbd_set_out_of_sync(mdev, e->sector, e->size);
                e->flags |= EE_CALL_AL_COMPLETE_IO;
+               e->flags &= ~EE_MAY_SET_IN_SYNC;
                drbd_al_begin_io(mdev, e->sector);
        }
 
        if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
                return TRUE;
 
+       /* drbd_submit_ee currently fails for one reason only:
+        * not being able to allocate enough bios.
+        * Is dropping the connection going to help? */
+       spin_lock_irq(&mdev->req_lock);
+       list_del(&e->w.list);
+       hlist_del_init(&e->colision);
+       spin_unlock_irq(&mdev->req_lock);
+       if (e->flags & EE_CALL_AL_COMPLETE_IO)
+               drbd_al_complete_io(mdev, e->sector);
+
 out_interrupted:
        /* yes, the epoch_size now is imbalanced.
         * but we drop the connection anyways, so we don't have a chance to
@@ -2016,20 +1872,64 @@ out_interrupted:
        return FALSE;
 }
 
-static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
+/* We may throttle resync, if the lower device seems to be busy,
+ * and current sync rate is above c_min_rate.
+ *
+ * To decide whether or not the lower device is busy, we use a scheme similar
+ * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
+ * (more than 64 sectors) of activity we cannot account for with our own resync
+ * activity, it obviously is "busy".
+ *
+ * The current sync rate used here uses only the most recent two step marks,
+ * to have a short time average so we can react faster.
+ */
+int drbd_rs_should_slow_down(struct drbd_conf *mdev)
+{
+       struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
+       unsigned long db, dt, dbdt;
+       int curr_events;
+       int throttle = 0;
+
+       /* feature disabled? */
+       if (mdev->sync_conf.c_min_rate == 0)
+               return 0;
+
+       curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
+                     (int)part_stat_read(&disk->part0, sectors[1]) -
+                       atomic_read(&mdev->rs_sect_ev);
+       if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
+               unsigned long rs_left;
+               int i;
+
+               mdev->rs_last_events = curr_events;
+
+               /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
+                * approx. */
+               i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-2) % DRBD_SYNC_MARKS;
+               rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
+
+               dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
+               if (!dt)
+                       dt++;
+               db = mdev->rs_mark_left[i] - rs_left;
+               dbdt = Bit2KB(db/dt);
+
+               if (dbdt > mdev->sync_conf.c_min_rate)
+                       throttle = 1;
+       }
+       return throttle;
+}
+
+
+static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
 {
        sector_t sector;
        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
        struct drbd_epoch_entry *e;
        struct digest_info *di = NULL;
-       int size, digest_size;
+       int size, verb;
        unsigned int fault_type;
-       struct p_block_req *p =
-               (struct p_block_req *)h;
-       const int brps = sizeof(*p)-sizeof(*h);
-
-       if (drbd_recv(mdev, h->payload, brps) != brps)
-               return FALSE;
+       struct p_block_req *p = &mdev->data.rbuf.block_req;
 
        sector = be64_to_cpu(p->sector);
        size   = be32_to_cpu(p->blksize);
@@ -2046,12 +1946,31 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
        }
 
        if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
-               if (__ratelimit(&drbd_ratelimit_state))
+               verb = 1;
+               switch (cmd) {
+               case P_DATA_REQUEST:
+                       drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
+                       break;
+               case P_RS_DATA_REQUEST:
+               case P_CSUM_RS_REQUEST:
+               case P_OV_REQUEST:
+                       drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
+                       break;
+               case P_OV_REPLY:
+                       verb = 0;
+                       dec_rs_pending(mdev);
+                       drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
+                       break;
+               default:
+                       dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
+                               cmdname(cmd));
+               }
+               if (verb && __ratelimit(&drbd_ratelimit_state))
                        dev_err(DEV, "Can not satisfy peer's read request, "
                            "no local data.\n");
-               drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
-                                P_NEG_RS_DREPLY , p);
-               return drbd_drain_block(mdev, h->length - brps);
+
+               /* drain possibly payload */
+               return drbd_drain_block(mdev, digest_size);
        }
 
        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
@@ -2063,31 +1982,21 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                return FALSE;
        }
 
-       switch (h->command) {
+       switch (cmd) {
        case P_DATA_REQUEST:
                e->w.cb = w_e_end_data_req;
                fault_type = DRBD_FAULT_DT_RD;
-               break;
+               /* application IO, don't drbd_rs_begin_io */
+               goto submit;
+
        case P_RS_DATA_REQUEST:
                e->w.cb = w_e_end_rsdata_req;
                fault_type = DRBD_FAULT_RS_RD;
-               /* Eventually this should become asynchronously. Currently it
-                * blocks the whole receiver just to delay the reading of a
-                * resync data block.
-                * the drbd_work_queue mechanism is made for this...
-                */
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted,
-                        * probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
-               }
                break;
 
        case P_OV_REPLY:
        case P_CSUM_RS_REQUEST:
                fault_type = DRBD_FAULT_RS_RD;
-               digest_size = h->length - brps ;
                di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
                if (!di)
                        goto out_free_e;
@@ -2095,31 +2004,25 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                di->digest_size = digest_size;
                di->digest = (((char *)di)+sizeof(struct digest_info));
 
+               e->digest = di;
+               e->flags |= EE_HAS_DIGEST;
+
                if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
                        goto out_free_e;
 
-               e->block_id = (u64)(unsigned long)di;
-               if (h->command == P_CSUM_RS_REQUEST) {
+               if (cmd == P_CSUM_RS_REQUEST) {
                        D_ASSERT(mdev->agreed_pro_version >= 89);
                        e->w.cb = w_e_end_csum_rs_req;
-               } else if (h->command == P_OV_REPLY) {
+               } else if (cmd == P_OV_REPLY) {
                        e->w.cb = w_e_end_ov_reply;
                        dec_rs_pending(mdev);
-                       break;
-               }
-
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted, probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
+                       /* drbd_rs_begin_io done when we sent this request,
+                        * but accounting still needs to be done. */
+                       goto submit_for_resync;
                }
                break;
 
        case P_OV_REQUEST:
-               if (mdev->state.conn >= C_CONNECTED &&
-                   mdev->state.conn != C_VERIFY_T)
-                       dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
-                               drbd_conn_str(mdev->state.conn));
                if (mdev->ov_start_sector == ~(sector_t)0 &&
                    mdev->agreed_pro_version >= 90) {
                        mdev->ov_start_sector = sector;
@@ -2130,37 +2033,63 @@ static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
                }
                e->w.cb = w_e_end_ov_req;
                fault_type = DRBD_FAULT_RS_RD;
-               /* Eventually this should become asynchronous. Currently it
-                * blocks the whole receiver just to delay the reading of a
-                * resync data block.
-                * the drbd_work_queue mechanism is made for this...
-                */
-               if (!drbd_rs_begin_io(mdev, sector)) {
-                       /* we have been interrupted,
-                        * probably connection lost! */
-                       D_ASSERT(signal_pending(current));
-                       goto out_free_e;
-               }
                break;
 
-
        default:
                dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
-                   cmdname(h->command));
+                   cmdname(cmd));
                fault_type = DRBD_FAULT_MAX;
+               goto out_free_e;
        }
 
-       spin_lock_irq(&mdev->req_lock);
-       list_add(&e->w.list, &mdev->read_ee);
-       spin_unlock_irq(&mdev->req_lock);
+       /* Throttle, drbd_rs_begin_io and submit should become asynchronous
+        * wrt the receiver, but it is not as straightforward as it may seem.
+        * Various places in the resync start and stop logic assume resync
+        * requests are processed in order, requeuing this on the worker thread
+        * introduces a bunch of new code for synchronization between threads.
+        *
+        * Unlimited throttling before drbd_rs_begin_io may stall the resync
+        * "forever", throttling after drbd_rs_begin_io will lock that extent
+        * for application writes for the same time.  For now, just throttle
+        * here, where the rest of the code expects the receiver to sleep for
+        * a while, anyways.
+        */
+
+       /* Throttle before drbd_rs_begin_io, as that locks out application IO;
+        * this defers syncer requests for some time, before letting at least
+        * on request through.  The resync controller on the receiving side
+        * will adapt to the incoming rate accordingly.
+        *
+        * We cannot throttle here if remote is Primary/SyncTarget:
+        * we would also throttle its application reads.
+        * In that case, throttling is done on the SyncTarget only.
+        */
+       if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev))
+               msleep(100);
+       if (drbd_rs_begin_io(mdev, e->sector))
+               goto out_free_e;
 
+submit_for_resync:
+       atomic_add(size >> 9, &mdev->rs_sect_ev);
+
+submit:
        inc_unacked(mdev);
+       spin_lock_irq(&mdev->req_lock);
+       list_add_tail(&e->w.list, &mdev->read_ee);
+       spin_unlock_irq(&mdev->req_lock);
 
        if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
                return TRUE;
 
+       /* drbd_submit_ee currently fails for one reason only:
+        * not being able to allocate enough bios.
+        * Is dropping the connection going to help? */
+       spin_lock_irq(&mdev->req_lock);
+       list_del(&e->w.list);
+       spin_unlock_irq(&mdev->req_lock);
+       /* no drbd_rs_complete_io(), we are dropping the connection anyways */
+
 out_free_e:
-       kfree(di);
        put_ldev(mdev);
        drbd_free_ee(mdev, e);
        return FALSE;
@@ -2699,20 +2628,13 @@ static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
        return 1;
 }
 
-static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
+static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_protocol *p = (struct p_protocol *)h;
-       int header_size, data_size;
+       struct p_protocol *p = &mdev->data.rbuf.protocol;
        int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
        int p_want_lose, p_two_primaries, cf;
        char p_integrity_alg[SHARED_SECRET_MAX] = "";
 
-       header_size = sizeof(*p) - sizeof(*h);
-       data_size   = h->length  - header_size;
-
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
-               return FALSE;
-
        p_proto         = be32_to_cpu(p->protocol);
        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
@@ -2805,39 +2727,46 @@ struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
        return tfm;
 }
 
-static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
+static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
 {
        int ok = TRUE;
-       struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
+       struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
        unsigned int header_size, data_size, exp_max_sz;
        struct crypto_hash *verify_tfm = NULL;
        struct crypto_hash *csums_tfm = NULL;
        const int apv = mdev->agreed_pro_version;
+       int *rs_plan_s = NULL;
+       int fifo_size = 0;
 
        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
                    : apv == 88 ? sizeof(struct p_rs_param)
                                        + SHARED_SECRET_MAX
-                   : /* 89 */    sizeof(struct p_rs_param_89);
+                   : apv <= 94 ? sizeof(struct p_rs_param_89)
+                   : /* apv >= 95 */ sizeof(struct p_rs_param_95);
 
-       if (h->length > exp_max_sz) {
+       if (packet_size > exp_max_sz) {
                dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
-                   h->length, exp_max_sz);
+                   packet_size, exp_max_sz);
                return FALSE;
        }
 
        if (apv <= 88) {
-               header_size = sizeof(struct p_rs_param) - sizeof(*h);
-               data_size   = h->length  - header_size;
-       } else /* apv >= 89 */ {
-               header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
-               data_size   = h->length  - header_size;
+               header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
+       } else if (apv <= 94) {
+               header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
+               D_ASSERT(data_size == 0);
+       } else {
+               header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
+               data_size   = packet_size  - header_size;
                D_ASSERT(data_size == 0);
        }
 
        /* initialize verify_alg and csums_alg */
        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
 
-       if (drbd_recv(mdev, h->payload, header_size) != header_size)
+       if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
                return FALSE;
 
        mdev->sync_conf.rate      = be32_to_cpu(p->rate);
@@ -2896,6 +2825,22 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
                        }
                }
 
+               if (apv > 94) {
+                       mdev->sync_conf.rate      = be32_to_cpu(p->rate);
+                       mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
+                       mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
+                       mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
+                       mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
+
+                       fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
+                       if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
+                               rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
+                               if (!rs_plan_s) {
+                                       dev_err(DEV, "kmalloc of fifo_buffer failed");
+                                       goto disconnect;
+                               }
+                       }
+               }
 
                spin_lock(&mdev->peer_seq_lock);
                /* lock against drbd_nl_syncer_conf() */
@@ -2913,6 +2858,12 @@ static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
                        mdev->csums_tfm = csums_tfm;
                        dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
                }
+               if (fifo_size != mdev->rs_plan_s.size) {
+                       kfree(mdev->rs_plan_s.values);
+                       mdev->rs_plan_s.values = rs_plan_s;
+                       mdev->rs_plan_s.size   = fifo_size;
+                       mdev->rs_planed = 0;
+               }
                spin_unlock(&mdev->peer_seq_lock);
        }
 
@@ -2946,19 +2897,15 @@ static void warn_if_differ_considerably(struct drbd_conf *mdev,
                     (unsigned long long)a, (unsigned long long)b);
 }
 
-static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
+static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_sizes *p = (struct p_sizes *)h;
+       struct p_sizes *p = &mdev->data.rbuf.sizes;
        enum determine_dev_size dd = unchanged;
        unsigned int max_seg_s;
        sector_t p_size, p_usize, my_usize;
        int ldsc = 0; /* local disk size changed */
        enum dds_flags ddsf;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        p_size = be64_to_cpu(p->d_size);
        p_usize = be64_to_cpu(p->u_size);
 
@@ -2972,7 +2919,6 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
         * we still need to figure out whether we accept that. */
        mdev->p_size = p_size;
 
-#define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
        if (get_ldev(mdev)) {
                warn_if_differ_considerably(mdev, "lower level device sizes",
                           p_size, drbd_get_max_capacity(mdev->ldev));
@@ -3029,6 +2975,8 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
 
                if (mdev->agreed_pro_version < 94)
                        max_seg_s = be32_to_cpu(p->max_segment_size);
+               else if (mdev->agreed_pro_version == 94)
+                       max_seg_s = DRBD_MAX_SIZE_H80_PACKET;
                else /* drbd 8.3.8 onwards */
                        max_seg_s = DRBD_MAX_SEGMENT_SIZE;
 
@@ -3062,16 +3010,12 @@ static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
+static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_uuids *p = (struct p_uuids *)h;
+       struct p_uuids *p = &mdev->data.rbuf.uuids;
        u64 *p_uuid;
        int i;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
 
        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
@@ -3107,6 +3051,11 @@ static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
                        drbd_md_sync(mdev);
                }
                put_ldev(mdev);
+       } else if (mdev->state.disk < D_INCONSISTENT &&
+                  mdev->state.role == R_PRIMARY) {
+               /* I am a diskless primary, the peer just created a new current UUID
+                  for me. */
+               drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
        }
 
        /* Before we test for the disk state, we should wait until an eventually
@@ -3150,16 +3099,12 @@ static union drbd_state convert_state(union drbd_state ps)
        return ms;
 }
 
-static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
+static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_req_state *p = (struct p_req_state *)h;
+       struct p_req_state *p = &mdev->data.rbuf.req_state;
        union drbd_state mask, val;
        int rv;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        mask.i = be32_to_cpu(p->mask);
        val.i = be32_to_cpu(p->val);
 
@@ -3180,20 +3125,14 @@ static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_state(struct drbd_conf *mdev, struct p_header *h)
+static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_state *p = (struct p_state *)h;
-       enum drbd_conns nconn, oconn;
-       union drbd_state ns, peer_state;
+       struct p_state *p = &mdev->data.rbuf.state;
+       union drbd_state os, ns, peer_state;
        enum drbd_disk_state real_peer_disk;
+       enum chg_state_flags cs_flags;
        int rv;
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
-               return FALSE;
-
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        peer_state.i = be32_to_cpu(p->state);
 
        real_peer_disk = peer_state.disk;
@@ -3204,40 +3143,74 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
 
        spin_lock_irq(&mdev->req_lock);
  retry:
-       oconn = nconn = mdev->state.conn;
+       os = ns = mdev->state;
        spin_unlock_irq(&mdev->req_lock);
 
-       if (nconn == C_WF_REPORT_PARAMS)
-               nconn = C_CONNECTED;
+       /* peer says his disk is uptodate, while we think it is inconsistent,
+        * and this happens while we think we have a sync going on. */
+       if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
+           os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
+               /* If we are (becoming) SyncSource, but peer is still in sync
+                * preparation, ignore its uptodate-ness to avoid flapping, it
+                * will change to inconsistent once the peer reaches active
+                * syncing states.
+                * It may have changed syncer-paused flags, however, so we
+                * cannot ignore this completely. */
+               if (peer_state.conn > C_CONNECTED &&
+                   peer_state.conn < C_SYNC_SOURCE)
+                       real_peer_disk = D_INCONSISTENT;
+
+               /* if peer_state changes to connected at the same time,
+                * it explicitly notifies us that it finished resync.
+                * Maybe we should finish it up, too? */
+               else if (os.conn >= C_SYNC_SOURCE &&
+                        peer_state.conn == C_CONNECTED) {
+                       if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
+                               drbd_resync_finished(mdev);
+                       return TRUE;
+               }
+       }
+
+       /* peer says his disk is inconsistent, while we think it is uptodate,
+        * and this happens while the peer still thinks we have a sync going on,
+        * but we think we are already done with the sync.
+        * We ignore this to avoid flapping pdsk.
+        * This should not happen, if the peer is a recent version of drbd. */
+       if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
+           os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
+               real_peer_disk = D_UP_TO_DATE;
+
+       if (ns.conn == C_WF_REPORT_PARAMS)
+               ns.conn = C_CONNECTED;
 
        if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
            get_ldev_if_state(mdev, D_NEGOTIATING)) {
                int cr; /* consider resync */
 
                /* if we established a new connection */
-               cr  = (oconn < C_CONNECTED);
+               cr  = (os.conn < C_CONNECTED);
                /* if we had an established connection
                 * and one of the nodes newly attaches a disk */
-               cr |= (oconn == C_CONNECTED &&
+               cr |= (os.conn == C_CONNECTED &&
                       (peer_state.disk == D_NEGOTIATING ||
-                       mdev->state.disk == D_NEGOTIATING));
+                       os.disk == D_NEGOTIATING));
                /* if we have both been inconsistent, and the peer has been
                 * forced to be UpToDate with --overwrite-data */
                cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
                /* if we had been plain connected, and the admin requested to
                 * start a sync by "invalidate" or "invalidate-remote" */
-               cr |= (oconn == C_CONNECTED &&
+               cr |= (os.conn == C_CONNECTED &&
                                (peer_state.conn >= C_STARTING_SYNC_S &&
                                 peer_state.conn <= C_WF_BITMAP_T));
 
                if (cr)
-                       nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
+                       ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
 
                put_ldev(mdev);
-               if (nconn == C_MASK) {
-                       nconn = C_CONNECTED;
+               if (ns.conn == C_MASK) {
+                       ns.conn = C_CONNECTED;
                        if (mdev->state.disk == D_NEGOTIATING) {
-                               drbd_force_state(mdev, NS(disk, D_DISKLESS));
+                               drbd_force_state(mdev, NS(disk, D_FAILED));
                        } else if (peer_state.disk == D_NEGOTIATING) {
                                dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
                                peer_state.disk = D_DISKLESS;
@@ -3245,7 +3218,7 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
                        } else {
                                if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
                                        return FALSE;
-                               D_ASSERT(oconn == C_WF_REPORT_PARAMS);
+                               D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
                                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
                                return FALSE;
                        }
@@ -3253,18 +3226,28 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
        }
 
        spin_lock_irq(&mdev->req_lock);
-       if (mdev->state.conn != oconn)
+       if (mdev->state.i != os.i)
                goto retry;
        clear_bit(CONSIDER_RESYNC, &mdev->flags);
-       ns.i = mdev->state.i;
-       ns.conn = nconn;
        ns.peer = peer_state.role;
        ns.pdsk = real_peer_disk;
        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
-       if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
+       if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
                ns.disk = mdev->new_state_tmp.disk;
-
-       rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
+       cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
+       if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
+           test_bit(NEW_CUR_UUID, &mdev->flags)) {
+               /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
+                  for temporal network outages! */
+               spin_unlock_irq(&mdev->req_lock);
+               dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
+               tl_clear(mdev);
+               drbd_uuid_new_current(mdev);
+               clear_bit(NEW_CUR_UUID, &mdev->flags);
+               drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
+               return FALSE;
+       }
+       rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
        ns = mdev->state;
        spin_unlock_irq(&mdev->req_lock);
 
@@ -3273,8 +3256,8 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
                return FALSE;
        }
 
-       if (oconn > C_WF_REPORT_PARAMS) {
-               if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
+       if (os.conn > C_WF_REPORT_PARAMS) {
+               if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
                    peer_state.disk != D_NEGOTIATING ) {
                        /* we want resync, peer has not yet decided to sync... */
                        /* Nowadays only used when forcing a node into primary role and
@@ -3291,9 +3274,9 @@ static int receive_state(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
+static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-       struct p_rs_uuid *p = (struct p_rs_uuid *)h;
+       struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
 
        wait_event(mdev->misc_wait,
                   mdev->state.conn == C_WF_SYNC_UUID ||
@@ -3302,10 +3285,6 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
 
        /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
 
-       ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
-       if (drbd_recv(mdev, h->payload, h->length) != h->length)
-               return FALSE;
-
        /* Here the _drbd_uuid_ functions are right, current should
           _not_ be rotated into the history */
        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
@@ -3324,14 +3303,14 @@ static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
 enum receive_bitmap_ret { OK, DONE, FAILED };
 
 static enum receive_bitmap_ret
-receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
-       unsigned long *buffer, struct bm_xfer_ctx *c)
+receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
+                    unsigned long *buffer, struct bm_xfer_ctx *c)
 {
        unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
        unsigned want = num_words * sizeof(long);
 
-       if (want != h->length) {
-               dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
+       if (want != data_size) {
+               dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
                return FAILED;
        }
        if (want == 0)
@@ -3360,7 +3339,7 @@ recv_bm_rle_bits(struct drbd_conf *mdev,
        u64 tmp;
        unsigned long s = c->bit_offset;
        unsigned long e;
-       int len = p->head.length - (sizeof(*p) - sizeof(p->head));
+       int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
        int toggle = DCBP_get_start(p);
        int have;
        int bits;
@@ -3429,7 +3408,7 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev,
                const char *direction, struct bm_xfer_ctx *c)
 {
        /* what would it take to transfer it "plaintext" */
-       unsigned plain = sizeof(struct p_header) *
+       unsigned plain = sizeof(struct p_header80) *
                ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
                + c->bm_words * sizeof(long);
        unsigned total = c->bytes[0] + c->bytes[1];
@@ -3467,12 +3446,13 @@ void INFO_bm_xfer_stats(struct drbd_conf *mdev,
    in order to be agnostic to the 32 vs 64 bits issue.
 
    returns 0 on failure, 1 if we successfully received it. */
-static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
+static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        struct bm_xfer_ctx c;
        void *buffer;
        enum receive_bitmap_ret ret;
        int ok = FALSE;
+       struct p_header80 *h = &mdev->data.rbuf.header.h80;
 
        wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
 
@@ -3492,39 +3472,39 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
        };
 
        do {
-               if (h->command == P_BITMAP) {
-                       ret = receive_bitmap_plain(mdev, h, buffer, &c);
-               } else if (h->command == P_COMPRESSED_BITMAP) {
+               if (cmd == P_BITMAP) {
+                       ret = receive_bitmap_plain(mdev, data_size, buffer, &c);
+               } else if (cmd == P_COMPRESSED_BITMAP) {
                        /* MAYBE: sanity check that we speak proto >= 90,
                         * and the feature is enabled! */
                        struct p_compressed_bm *p;
 
-                       if (h->length > BM_PACKET_PAYLOAD_BYTES) {
+                       if (data_size > BM_PACKET_PAYLOAD_BYTES) {
                                dev_err(DEV, "ReportCBitmap packet too large\n");
                                goto out;
                        }
                        /* use the page buff */
                        p = buffer;
                        memcpy(p, h, sizeof(*h));
-                       if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
+                       if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
                                goto out;
-                       if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
-                               dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
+                       if (data_size <= (sizeof(*p) - sizeof(p->head))) {
+                               dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
                                return FAILED;
                        }
                        ret = decode_bitmap_c(mdev, p, &c);
                } else {
-                       dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
+                       dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
                        goto out;
                }
 
-               c.packets[h->command == P_BITMAP]++;
-               c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
+               c.packets[cmd == P_BITMAP]++;
+               c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
 
                if (ret != OK)
                        break;
 
-               if (!drbd_recv_header(mdev, h))
+               if (!drbd_recv_header(mdev, &cmd, &data_size))
                        goto out;
        } while (ret == OK);
        if (ret == FAILED)
@@ -3555,17 +3535,16 @@ static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
        return ok;
 }
 
-static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
+static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        /* TODO zero copy sink :) */
        static char sink[128];
        int size, want, r;
 
-       if (!silent)
-               dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
-                    h->command, h->length);
+       dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
+                cmd, data_size);
 
-       size = h->length;
+       size = data_size;
        while (size > 0) {
                want = min_t(int, size, sizeof(sink));
                r = drbd_recv(mdev, sink, want);
@@ -3575,17 +3554,7 @@ static int receive_skip_(struct drbd_conf *mdev, struct p_header *h, int silent)
        return size == 0;
 }
 
-static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
-{
-       return receive_skip_(mdev, h, 0);
-}
-
-static int receive_skip_silent(struct drbd_conf *mdev, struct p_header *h)
-{
-       return receive_skip_(mdev, h, 1);
-}
-
-static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
+static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
        if (mdev->state.disk >= D_INCONSISTENT)
                drbd_kick_lo(mdev);
@@ -3597,108 +3566,96 @@ static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
-
-static drbd_cmd_handler_f drbd_default_handler[] = {
-       [P_DATA]            = receive_Data,
-       [P_DATA_REPLY]      = receive_DataReply,
-       [P_RS_DATA_REPLY]   = receive_RSDataReply,
-       [P_BARRIER]         = receive_Barrier,
-       [P_BITMAP]          = receive_bitmap,
-       [P_COMPRESSED_BITMAP]    = receive_bitmap,
-       [P_UNPLUG_REMOTE]   = receive_UnplugRemote,
-       [P_DATA_REQUEST]    = receive_DataRequest,
-       [P_RS_DATA_REQUEST] = receive_DataRequest,
-       [P_SYNC_PARAM]      = receive_SyncParam,
-       [P_SYNC_PARAM89]           = receive_SyncParam,
-       [P_PROTOCOL]        = receive_protocol,
-       [P_UUIDS]           = receive_uuids,
-       [P_SIZES]           = receive_sizes,
-       [P_STATE]           = receive_state,
-       [P_STATE_CHG_REQ]   = receive_req_state,
-       [P_SYNC_UUID]       = receive_sync_uuid,
-       [P_OV_REQUEST]      = receive_DataRequest,
-       [P_OV_REPLY]        = receive_DataRequest,
-       [P_CSUM_RS_REQUEST]    = receive_DataRequest,
-       [P_DELAY_PROBE]     = receive_skip_silent,
+typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
+
+struct data_cmd {
+       int expect_payload;
+       size_t pkt_size;
+       drbd_cmd_handler_f function;
+};
+
+static struct data_cmd drbd_cmd_handler[] = {
+       [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
+       [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
+       [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
+       [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
+       [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
+       [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
+       [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
+       [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
+       [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
+       [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
+       [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
+       [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
+       [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
+       [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
+       [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
+       [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
+       [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
+       [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
+       [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
        /* anything missing from this table is in
         * the asender_tbl, see get_asender_cmd */
-       [P_MAX_CMD]         = NULL,
+       [P_MAX_CMD]         = { 0, 0, NULL },
 };
 
-static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
-static drbd_cmd_handler_f *drbd_opt_cmd_handler;
+/* All handler functions that expect a sub-header get that sub-heder in
+   mdev->data.rbuf.header.head.payload.
+
+   Usually in mdev->data.rbuf.header.head the callback can find the usual
+   p_header, but they may not rely on that. Since there is also p_header95 !
+ */
 
 static void drbdd(struct drbd_conf *mdev)
 {
-       drbd_cmd_handler_f handler;
-       struct p_header *header = &mdev->data.rbuf.header;
+       union p_header *header = &mdev->data.rbuf.header;
+       unsigned int packet_size;
+       enum drbd_packets cmd;
+       size_t shs; /* sub header size */
+       int rv;
 
        while (get_t_state(&mdev->receiver) == Running) {
                drbd_thread_current_set_cpu(mdev);
-               if (!drbd_recv_header(mdev, header)) {
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
-               }
+               if (!drbd_recv_header(mdev, &cmd, &packet_size))
+                       goto err_out;
 
-               if (header->command < P_MAX_CMD)
-                       handler = drbd_cmd_handler[header->command];
-               else if (P_MAY_IGNORE < header->command
-                    && header->command < P_MAX_OPT_CMD)
-                       handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
-               else if (header->command > P_MAX_OPT_CMD)
-                       handler = receive_skip;
-               else
-                       handler = NULL;
+               if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
+                       dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
+                       goto err_out;
+               }
 
-               if (unlikely(!handler)) {
-                       dev_err(DEV, "unknown packet type %d, l: %d!\n",
-                           header->command, header->length);
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
+               shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
+               if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
+                       dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
+                       goto err_out;
                }
-               if (unlikely(!handler(mdev, header))) {
-                       dev_err(DEV, "error receiving %s, l: %d!\n",
-                           cmdname(header->command), header->length);
-                       drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
-                       break;
+
+               if (shs) {
+                       rv = drbd_recv(mdev, &header->h80.payload, shs);
+                       if (unlikely(rv != shs)) {
+                               dev_err(DEV, "short read while reading sub header: rv=%d\n", rv);
+                               goto err_out;
+                       }
                }
-       }
-}
 
-static void drbd_fail_pending_reads(struct drbd_conf *mdev)
-{
-       struct hlist_head *slot;
-       struct hlist_node *pos;
-       struct hlist_node *tmp;
-       struct drbd_request *req;
-       int i;
+               rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
 
-       /*
-        * Application READ requests
-        */
-       spin_lock_irq(&mdev->req_lock);
-       for (i = 0; i < APP_R_HSIZE; i++) {
-               slot = mdev->app_reads_hash+i;
-               hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
-                       /* it may (but should not any longer!)
-                        * be on the work queue; if that assert triggers,
-                        * we need to also grab the
-                        * spin_lock_irq(&mdev->data.work.q_lock);
-                        * and list_del_init here. */
-                       D_ASSERT(list_empty(&req->w.list));
-                       /* It would be nice to complete outside of spinlock.
-                        * But this is easier for now. */
-                       _req_mod(req, connection_lost_while_pending);
+               if (unlikely(!rv)) {
+                       dev_err(DEV, "error receiving %s, l: %d!\n",
+                           cmdname(cmd), packet_size);
+                       goto err_out;
                }
        }
-       for (i = 0; i < APP_R_HSIZE; i++)
-               if (!hlist_empty(mdev->app_reads_hash+i))
-                       dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
-                               "%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
 
-       memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
-       spin_unlock_irq(&mdev->req_lock);
+       if (0) {
+       err_out:
+               drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
+       }
+       /* If we leave here, we probably want to update at least the
+        * "Connected" indicator on stable storage. Do so explicitly here. */
+       drbd_md_sync(mdev);
 }
 
 void drbd_flush_workqueue(struct drbd_conf *mdev)
@@ -3711,6 +3668,36 @@ void drbd_flush_workqueue(struct drbd_conf *mdev)
        wait_for_completion(&barr.done);
 }
 
+void drbd_free_tl_hash(struct drbd_conf *mdev)
+{
+       struct hlist_head *h;
+
+       spin_lock_irq(&mdev->req_lock);
+
+       if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
+               spin_unlock_irq(&mdev->req_lock);
+               return;
+       }
+       /* paranoia code */
+       for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
+               if (h->first)
+                       dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
+                               (int)(h - mdev->ee_hash), h->first);
+       kfree(mdev->ee_hash);
+       mdev->ee_hash = NULL;
+       mdev->ee_hash_s = 0;
+
+       /* paranoia code */
+       for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
+               if (h->first)
+                       dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
+                               (int)(h - mdev->tl_hash), h->first);
+       kfree(mdev->tl_hash);
+       mdev->tl_hash = NULL;
+       mdev->tl_hash_s = 0;
+       spin_unlock_irq(&mdev->req_lock);
+}
+
 static void drbd_disconnect(struct drbd_conf *mdev)
 {
        enum drbd_fencing_p fp;
@@ -3728,6 +3715,7 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        drbd_thread_stop(&mdev->asender);
        drbd_free_sock(mdev);
 
+       /* wait for current activity to cease. */
        spin_lock_irq(&mdev->req_lock);
        _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
        _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
@@ -3752,7 +3740,6 @@ static void drbd_disconnect(struct drbd_conf *mdev)
 
        /* make sure syncer is stopped and w_resume_next_sg queued */
        del_timer_sync(&mdev->resync_timer);
-       set_bit(STOP_SYNC_TIMER, &mdev->flags);
        resync_timer_fn((unsigned long)mdev);
 
        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
@@ -3767,11 +3754,9 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        kfree(mdev->p_uuid);
        mdev->p_uuid = NULL;
 
-       if (!mdev->state.susp)
+       if (!is_susp(mdev->state))
                tl_clear(mdev);
 
-       drbd_fail_pending_reads(mdev);
-
        dev_info(DEV, "Connection closed\n");
 
        drbd_md_sync(mdev);
@@ -3782,12 +3767,8 @@ static void drbd_disconnect(struct drbd_conf *mdev)
                put_ldev(mdev);
        }
 
-       if (mdev->state.role == R_PRIMARY) {
-               if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
-                       enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
-                       drbd_request_state(mdev, NS(pdsk, nps));
-               }
-       }
+       if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
+               drbd_try_outdate_peer_async(mdev);
 
        spin_lock_irq(&mdev->req_lock);
        os = mdev->state;
@@ -3800,32 +3781,14 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        spin_unlock_irq(&mdev->req_lock);
 
        if (os.conn == C_DISCONNECTING) {
-               struct hlist_head *h;
-               wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
-
-               /* we must not free the tl_hash
-                * while application io is still on the fly */
-               wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
+               wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
 
-               spin_lock_irq(&mdev->req_lock);
-               /* paranoia code */
-               for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
-                       if (h->first)
-                               dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
-                                               (int)(h - mdev->ee_hash), h->first);
-               kfree(mdev->ee_hash);
-               mdev->ee_hash = NULL;
-               mdev->ee_hash_s = 0;
-
-               /* paranoia code */
-               for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
-                       if (h->first)
-                               dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
-                                               (int)(h - mdev->tl_hash), h->first);
-               kfree(mdev->tl_hash);
-               mdev->tl_hash = NULL;
-               mdev->tl_hash_s = 0;
-               spin_unlock_irq(&mdev->req_lock);
+               if (!is_susp(mdev->state)) {
+                       /* we must not free the tl_hash
+                        * while application io is still on the fly */
+                       wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
+                       drbd_free_tl_hash(mdev);
+               }
 
                crypto_free_hash(mdev->cram_hmac_tfm);
                mdev->cram_hmac_tfm = NULL;
@@ -3845,6 +3808,9 @@ static void drbd_disconnect(struct drbd_conf *mdev)
        i = drbd_release_ee(mdev, &mdev->net_ee);
        if (i)
                dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
+       i = atomic_read(&mdev->pp_in_use_by_net);
+       if (i)
+               dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
        i = atomic_read(&mdev->pp_in_use);
        if (i)
                dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
@@ -3888,7 +3854,7 @@ static int drbd_send_handshake(struct drbd_conf *mdev)
        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
        ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
-                            (struct p_header *)p, sizeof(*p), 0 );
+                            (struct p_header80 *)p, sizeof(*p), 0 );
        mutex_unlock(&mdev->data.mutex);
        return ok;
 }
@@ -3904,27 +3870,28 @@ static int drbd_do_handshake(struct drbd_conf *mdev)
 {
        /* ASSERT current == mdev->receiver ... */
        struct p_handshake *p = &mdev->data.rbuf.handshake;
-       const int expect = sizeof(struct p_handshake)
-                         -sizeof(struct p_header);
+       const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
+       unsigned int length;
+       enum drbd_packets cmd;
        int rv;
 
        rv = drbd_send_handshake(mdev);
        if (!rv)
                return 0;
 
-       rv = drbd_recv_header(mdev, &p->head);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                return 0;
 
-       if (p->head.command != P_HAND_SHAKE) {
+       if (cmd != P_HAND_SHAKE) {
                dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
-                    cmdname(p->head.command), p->head.command);
+                    cmdname(cmd), cmd);
                return -1;
        }
 
-       if (p->head.length != expect) {
+       if (length != expect) {
                dev_err(DEV, "expected HandShake length: %u, received: %u\n",
-                    expect, p->head.length);
+                    expect, length);
                return -1;
        }
 
@@ -3982,10 +3949,11 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        char *response = NULL;
        char *right_response = NULL;
        char *peers_ch = NULL;
-       struct p_header p;
        unsigned int key_len = strlen(mdev->net_conf->shared_secret);
        unsigned int resp_size;
        struct hash_desc desc;
+       enum drbd_packets cmd;
+       unsigned int length;
        int rv;
 
        desc.tfm = mdev->cram_hmac_tfm;
@@ -4005,33 +3973,33 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        if (!rv)
                goto fail;
 
-       rv = drbd_recv_header(mdev, &p);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                goto fail;
 
-       if (p.command != P_AUTH_CHALLENGE) {
+       if (cmd != P_AUTH_CHALLENGE) {
                dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
-                   cmdname(p.command), p.command);
+                   cmdname(cmd), cmd);
                rv = 0;
                goto fail;
        }
 
-       if (p.length > CHALLENGE_LEN*2) {
+       if (length > CHALLENGE_LEN * 2) {
                dev_err(DEV, "expected AuthChallenge payload too big.\n");
                rv = -1;
                goto fail;
        }
 
-       peers_ch = kmalloc(p.length, GFP_NOIO);
+       peers_ch = kmalloc(length, GFP_NOIO);
        if (peers_ch == NULL) {
                dev_err(DEV, "kmalloc of peers_ch failed\n");
                rv = -1;
                goto fail;
        }
 
-       rv = drbd_recv(mdev, peers_ch, p.length);
+       rv = drbd_recv(mdev, peers_ch, length);
 
-       if (rv != p.length) {
+       if (rv != length) {
                dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
                rv = 0;
                goto fail;
@@ -4046,7 +4014,7 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        }
 
        sg_init_table(&sg, 1);
-       sg_set_buf(&sg, peers_ch, p.length);
+       sg_set_buf(&sg, peers_ch, length);
 
        rv = crypto_hash_digest(&desc, &sg, sg.length, response);
        if (rv) {
@@ -4059,18 +4027,18 @@ static int drbd_do_auth(struct drbd_conf *mdev)
        if (!rv)
                goto fail;
 
-       rv = drbd_recv_header(mdev, &p);
+       rv = drbd_recv_header(mdev, &cmd, &length);
        if (!rv)
                goto fail;
 
-       if (p.command != P_AUTH_RESPONSE) {
+       if (cmd != P_AUTH_RESPONSE) {
                dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
-                   cmdname(p.command), p.command);
+                       cmdname(cmd), cmd);
                rv = 0;
                goto fail;
        }
 
-       if (p.length != resp_size) {
+       if (length != resp_size) {
                dev_err(DEV, "expected AuthResponse payload of wrong size\n");
                rv = 0;
                goto fail;
@@ -4155,7 +4123,7 @@ int drbdd_init(struct drbd_thread *thi)
 
 /* ********* acknowledge sender ******** */
 
-static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_req_state_reply *p = (struct p_req_state_reply *)h;
 
@@ -4173,13 +4141,13 @@ static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
+static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
 {
        return drbd_send_ping_ack(mdev);
 
 }
 
-static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        /* restore idle timeout */
        mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
@@ -4189,7 +4157,7 @@ static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
+static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4199,11 +4167,15 @@ static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
 
        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
 
-       drbd_rs_complete_io(mdev, sector);
-       drbd_set_in_sync(mdev, sector, blksize);
-       /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
-       mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
+       if (get_ldev(mdev)) {
+               drbd_rs_complete_io(mdev, sector);
+               drbd_set_in_sync(mdev, sector, blksize);
+               /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
+               mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
+               put_ldev(mdev);
+       }
        dec_rs_pending(mdev);
+       atomic_add(blksize >> 9, &mdev->rs_sect_in);
 
        return TRUE;
 }
@@ -4259,7 +4231,7 @@ static int validate_req_change_req_state(struct drbd_conf *mdev,
        return TRUE;
 }
 
-static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4299,7 +4271,7 @@ static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
                _ack_id_to_req, __func__ , what);
 }
 
-static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4319,7 +4291,7 @@ static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
                _ack_id_to_req, __func__ , neg_acked);
 }
 
-static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        sector_t sector = be64_to_cpu(p->sector);
@@ -4332,7 +4304,7 @@ static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
                _ar_id_to_req, __func__ , neg_acked);
 }
 
-static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
+static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
 {
        sector_t sector;
        int size;
@@ -4354,7 +4326,7 @@ static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
+static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_barrier_ack *p = (struct p_barrier_ack *)h;
 
@@ -4363,7 +4335,7 @@ static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
        return TRUE;
 }
 
-static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
+static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
 {
        struct p_block_ack *p = (struct p_block_ack *)h;
        struct drbd_work *w;
@@ -4380,6 +4352,9 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
        else
                ov_oos_print(mdev);
 
+       if (!get_ldev(mdev))
+               return TRUE;
+
        drbd_rs_complete_io(mdev, sector);
        dec_rs_pending(mdev);
 
@@ -4394,18 +4369,18 @@ static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
                        drbd_resync_finished(mdev);
                }
        }
+       put_ldev(mdev);
        return TRUE;
 }
 
-static int got_something_to_ignore_m(struct drbd_conf *mdev, struct p_header *h)
+static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
 {
-       /* IGNORE */
        return TRUE;
 }
 
 struct asender_cmd {
        size_t pkt_size;
-       int (*process)(struct drbd_conf *mdev, struct p_header *h);
+       int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
 };
 
 static struct asender_cmd *get_asender_cmd(int cmd)
@@ -4414,8 +4389,8 @@ static struct asender_cmd *get_asender_cmd(int cmd)
                /* anything missing from this table is in
                 * the drbd_cmd_handler (drbd_default_handler) table,
                 * see the beginning of drbdd() */
-       [P_PING]            = { sizeof(struct p_header), got_Ping },
-       [P_PING_ACK]        = { sizeof(struct p_header), got_PingAck },
+       [P_PING]            = { sizeof(struct p_header80), got_Ping },
+       [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
@@ -4427,7 +4402,7 @@ static struct asender_cmd *get_asender_cmd(int cmd)
        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
-       [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_something_to_ignore_m },
+       [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
        [P_MAX_CMD]         = { 0, NULL },
        };
        if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
@@ -4438,13 +4413,13 @@ static struct asender_cmd *get_asender_cmd(int cmd)
 int drbd_asender(struct drbd_thread *thi)
 {
        struct drbd_conf *mdev = thi->mdev;
-       struct p_header *h = &mdev->meta.rbuf.header;
+       struct p_header80 *h = &mdev->meta.rbuf.header.h80;
        struct asender_cmd *cmd = NULL;
 
        int rv, len;
        void *buf    = h;
        int received = 0;
-       int expect   = sizeof(struct p_header);
+       int expect   = sizeof(struct p_header80);
        int empty;
 
        sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
@@ -4468,10 +4443,8 @@ int drbd_asender(struct drbd_thread *thi)
                while (1) {
                        clear_bit(SIGNAL_ASENDER, &mdev->flags);
                        flush_signals(current);
-                       if (!drbd_process_done_ee(mdev)) {
-                               dev_err(DEV, "process_done_ee() = NOT_OK\n");
+                       if (!drbd_process_done_ee(mdev))
                                goto reconnect;
-                       }
                        /* to avoid race with newly queued ACKs */
                        set_bit(SIGNAL_ASENDER, &mdev->flags);
                        spin_lock_irq(&mdev->req_lock);
@@ -4530,21 +4503,23 @@ int drbd_asender(struct drbd_thread *thi)
 
                if (received == expect && cmd == NULL) {
                        if (unlikely(h->magic != BE_DRBD_MAGIC)) {
-                               dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
-                                   (long)be32_to_cpu(h->magic),
-                                   h->command, h->length);
+                               dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
+                                   be32_to_cpu(h->magic),
+                                   be16_to_cpu(h->command),
+                                   be16_to_cpu(h->length));
                                goto reconnect;
                        }
                        cmd = get_asender_cmd(be16_to_cpu(h->command));
                        len = be16_to_cpu(h->length);
                        if (unlikely(cmd == NULL)) {
-                               dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
-                                   (long)be32_to_cpu(h->magic),
-                                   h->command, h->length);
+                               dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
+                                   be32_to_cpu(h->magic),
+                                   be16_to_cpu(h->command),
+                                   be16_to_cpu(h->length));
                                goto disconnect;
                        }
                        expect = cmd->pkt_size;
-                       ERR_IF(len != expect-sizeof(struct p_header))
+                       ERR_IF(len != expect-sizeof(struct p_header80))
                                goto reconnect;
                }
                if (received == expect) {
@@ -4554,7 +4529,7 @@ int drbd_asender(struct drbd_thread *thi)
 
                        buf      = h;
                        received = 0;
-                       expect   = sizeof(struct p_header);
+                       expect   = sizeof(struct p_header80);
                        cmd      = NULL;
                }
        }
@@ -4562,10 +4537,12 @@ int drbd_asender(struct drbd_thread *thi)
        if (0) {
 reconnect:
                drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
+               drbd_md_sync(mdev);
        }
        if (0) {
 disconnect:
                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
+               drbd_md_sync(mdev);
        }
        clear_bit(SIGNAL_ASENDER, &mdev->flags);