]> git.karo-electronics.de Git - karo-tx-linux.git/blobdiff - include/linux/ptr_ring.h
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-next
[karo-tx-linux.git] / include / linux / ptr_ring.h
index 6b2e0dd88569b13c66ef445609b2f12419fdd3ac..d8c97ec8a8e66d865f516f708a5b3f8fe567da29 100644 (file)
@@ -278,6 +278,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r)
        return ptr;
 }
 
+static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
+                                            void **array, int n)
+{
+       void *ptr;
+       int i;
+
+       for (i = 0; i < n; i++) {
+               ptr = __ptr_ring_consume(r);
+               if (!ptr)
+                       break;
+               array[i] = ptr;
+       }
+
+       return i;
+}
+
 /*
  * Note: resize (below) nests producer lock within consumer lock, so if you
  * call this in interrupt or BH context, you must disable interrupts/BH when
@@ -328,6 +344,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
        return ptr;
 }
 
+static inline int ptr_ring_consume_batched(struct ptr_ring *r,
+                                          void **array, int n)
+{
+       int ret;
+
+       spin_lock(&r->consumer_lock);
+       ret = __ptr_ring_consume_batched(r, array, n);
+       spin_unlock(&r->consumer_lock);
+
+       return ret;
+}
+
+static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
+                                              void **array, int n)
+{
+       int ret;
+
+       spin_lock_irq(&r->consumer_lock);
+       ret = __ptr_ring_consume_batched(r, array, n);
+       spin_unlock_irq(&r->consumer_lock);
+
+       return ret;
+}
+
+static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
+                                              void **array, int n)
+{
+       unsigned long flags;
+       int ret;
+
+       spin_lock_irqsave(&r->consumer_lock, flags);
+       ret = __ptr_ring_consume_batched(r, array, n);
+       spin_unlock_irqrestore(&r->consumer_lock, flags);
+
+       return ret;
+}
+
+static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
+                                             void **array, int n)
+{
+       int ret;
+
+       spin_lock_bh(&r->consumer_lock);
+       ret = __ptr_ring_consume_batched(r, array, n);
+       spin_unlock_bh(&r->consumer_lock);
+
+       return ret;
+}
+
 /* Cast to structure type and call a function without discarding from FIFO.
  * Function must return a value.
  * Callers must take consumer_lock.
@@ -403,6 +468,61 @@ static inline int ptr_ring_init(struct ptr_ring *r, int size, gfp_t gfp)
        return 0;
 }
 
+/*
+ * Return entries into ring. Destroy entries that don't fit.
+ *
+ * Note: this is expected to be a rare slow path operation.
+ *
+ * Note: producer lock is nested within consumer lock, so if you
+ * resize you must make sure all uses nest correctly.
+ * In particular if you consume ring in interrupt or BH context, you must
+ * disable interrupts/BH when doing so.
+ */
+static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
+                                     void (*destroy)(void *))
+{
+       unsigned long flags;
+       int head;
+
+       spin_lock_irqsave(&r->consumer_lock, flags);
+       spin_lock(&r->producer_lock);
+
+       if (!r->size)
+               goto done;
+
+       /*
+        * Clean out buffered entries (for simplicity). This way following code
+        * can test entries for NULL and if not assume they are valid.
+        */
+       head = r->consumer_head - 1;
+       while (likely(head >= r->consumer_tail))
+               r->queue[head--] = NULL;
+       r->consumer_tail = r->consumer_head;
+
+       /*
+        * Go over entries in batch, start moving head back and copy entries.
+        * Stop when we run into previously unconsumed entries.
+        */
+       while (n) {
+               head = r->consumer_head - 1;
+               if (head < 0)
+                       head = r->size - 1;
+               if (r->queue[head]) {
+                       /* This batch entry will have to be destroyed. */
+                       goto done;
+               }
+               r->queue[head] = batch[--n];
+               r->consumer_tail = r->consumer_head = head;
+       }
+
+done:
+       /* Destroy all entries left in the batch. */
+       while (n)
+               destroy(batch[--n]);
+       spin_unlock(&r->producer_lock);
+       spin_unlock_irqrestore(&r->consumer_lock, flags);
+}
+
 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
                                           int size, gfp_t gfp,
                                           void (*destroy)(void *))