2 * Copyright (c) 2014 Chelsio Communications, Inc.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
31 #include <sys/types.h>
32 #include <sys/param.h>
33 #include <sys/systm.h>
34 #include <sys/counter.h>
36 #include <sys/mutex.h>
37 #include <sys/malloc.h>
38 #include <machine/cpu.h>
40 #if defined(__powerpc__) || defined(__mips__)
41 #define NO_64BIT_ATOMICS
45 #define atomic_cmpset_acq_64 atomic_cmpset_64
46 #define atomic_cmpset_rel_64 atomic_cmpset_64
49 #include <net/mp_ring.h>
62 IDLE
= 0, /* consumer ran to completion, nothing more to do. */
63 BUSY
, /* consumer is running already, or will be shortly. */
64 STALLED
, /* consumer stopped due to lack of resources. */
65 ABDICATED
, /* consumer stopped even though there was work to be
66 done because it wants another thread to take over. */
69 static inline uint16_t
70 space_available(struct ifmp_ring
*r
, union ring_state s
)
72 uint16_t x
= r
->size
- 1;
74 if (s
.cidx
== s
.pidx_head
)
76 else if (s
.cidx
> s
.pidx_head
)
77 return (s
.cidx
- s
.pidx_head
- 1);
79 return (x
- s
.pidx_head
+ s
.cidx
);
82 static inline uint16_t
83 increment_idx(struct ifmp_ring
*r
, uint16_t idx
, uint16_t n
)
85 int x
= r
->size
- idx
;
88 return (x
> n
? idx
+ n
: n
- x
);
91 /* Consumer is about to update the ring's state to s */
92 static inline uint16_t
93 state_to_flags(union ring_state s
, int abdicate
)
96 if (s
.cidx
== s
.pidx_tail
)
98 else if (abdicate
&& s
.pidx_tail
!= s
.pidx_head
)
104 #ifdef NO_64BIT_ATOMICS
106 drain_ring_locked(struct ifmp_ring
*r
, union ring_state os
, uint16_t prev
, int budget
)
109 int n
, pending
, total
;
110 uint16_t cidx
= os
.cidx
;
111 uint16_t pidx
= os
.pidx_tail
;
113 MPASS(os
.flags
== BUSY
);
117 counter_u64_add(r
->starts
, 1);
121 while (cidx
!= pidx
) {
123 /* Items from cidx to pidx are available for consumption. */
124 n
= r
->drain(r
, cidx
, pidx
);
126 os
.state
= ns
.state
= r
->state
;
131 counter_u64_add(r
->stalls
, 1);
132 else if (total
> 0) {
133 counter_u64_add(r
->restarts
, 1);
134 counter_u64_add(r
->stalls
, 1);
138 cidx
= increment_idx(r
, cidx
, n
);
143 * We update the cidx only if we've caught up with the pidx, the
144 * real cidx is getting too far ahead of the one visible to
145 * everyone else, or we have exceeded our budget.
147 if (cidx
!= pidx
&& pending
< 64 && total
< budget
)
150 os
.state
= ns
.state
= r
->state
;
152 ns
.flags
= state_to_flags(ns
, total
>= budget
);
155 if (ns
.flags
== ABDICATED
)
156 counter_u64_add(r
->abdications
, 1);
157 if (ns
.flags
!= BUSY
) {
158 /* Wrong loop exit if we're going to stall. */
159 MPASS(ns
.flags
!= STALLED
);
160 if (prev
== STALLED
) {
162 counter_u64_add(r
->restarts
, 1);
168 * The acquire style atomic above guarantees visibility of items
169 * associated with any pidx change that we notice here.
177 * Caller passes in a state, with a guarantee that there is work to do and that
178 * all items up to the pidx_tail in the state are visible.
181 drain_ring_lockless(struct ifmp_ring
*r
, union ring_state os
, uint16_t prev
, int budget
)
184 int n
, pending
, total
;
185 uint16_t cidx
= os
.cidx
;
186 uint16_t pidx
= os
.pidx_tail
;
188 MPASS(os
.flags
== BUSY
);
192 counter_u64_add(r
->starts
, 1);
196 while (cidx
!= pidx
) {
198 /* Items from cidx to pidx are available for consumption. */
199 n
= r
->drain(r
, cidx
, pidx
);
203 os
.state
= ns
.state
= r
->state
;
206 } while (atomic_cmpset_64(&r
->state
, os
.state
,
210 counter_u64_add(r
->stalls
, 1);
211 else if (total
> 0) {
212 counter_u64_add(r
->restarts
, 1);
213 counter_u64_add(r
->stalls
, 1);
217 cidx
= increment_idx(r
, cidx
, n
);
222 * We update the cidx only if we've caught up with the pidx, the
223 * real cidx is getting too far ahead of the one visible to
224 * everyone else, or we have exceeded our budget.
226 if (cidx
!= pidx
&& pending
< 64 && total
< budget
)
230 os
.state
= ns
.state
= r
->state
;
232 ns
.flags
= state_to_flags(ns
, total
>= budget
);
233 } while (atomic_cmpset_acq_64(&r
->state
, os
.state
, ns
.state
) == 0);
236 if (ns
.flags
== ABDICATED
)
237 counter_u64_add(r
->abdications
, 1);
238 if (ns
.flags
!= BUSY
) {
239 /* Wrong loop exit if we're going to stall. */
240 MPASS(ns
.flags
!= STALLED
);
241 if (prev
== STALLED
) {
243 counter_u64_add(r
->restarts
, 1);
249 * The acquire style atomic above guarantees visibility of items
250 * associated with any pidx change that we notice here.
259 ifmp_ring_alloc(struct ifmp_ring
**pr
, int size
, void *cookie
, mp_ring_drain_t drain
,
260 mp_ring_can_drain_t can_drain
, struct malloc_type
*mt
, int flags
)
264 /* All idx are 16b so size can be 65536 at most */
265 if (pr
== NULL
|| size
< 2 || size
> 65536 || drain
== NULL
||
269 flags
&= M_NOWAIT
| M_WAITOK
;
272 r
= malloc(__offsetof(struct ifmp_ring
, items
[size
]), mt
, flags
| M_ZERO
);
279 r
->can_drain
= can_drain
;
280 r
->enqueues
= counter_u64_alloc(flags
);
281 r
->drops
= counter_u64_alloc(flags
);
282 r
->starts
= counter_u64_alloc(flags
);
283 r
->stalls
= counter_u64_alloc(flags
);
284 r
->restarts
= counter_u64_alloc(flags
);
285 r
->abdications
= counter_u64_alloc(flags
);
286 if (r
->enqueues
== NULL
|| r
->drops
== NULL
|| r
->starts
== NULL
||
287 r
->stalls
== NULL
|| r
->restarts
== NULL
||
288 r
->abdications
== NULL
) {
294 #ifdef NO_64BIT_ATOMICS
295 mtx_init(&r
->lock
, "mp_ring lock", NULL
, MTX_DEF
);
301 ifmp_ring_free(struct ifmp_ring
*r
)
307 if (r
->enqueues
!= NULL
)
308 counter_u64_free(r
->enqueues
);
309 if (r
->drops
!= NULL
)
310 counter_u64_free(r
->drops
);
311 if (r
->starts
!= NULL
)
312 counter_u64_free(r
->starts
);
313 if (r
->stalls
!= NULL
)
314 counter_u64_free(r
->stalls
);
315 if (r
->restarts
!= NULL
)
316 counter_u64_free(r
->restarts
);
317 if (r
->abdications
!= NULL
)
318 counter_u64_free(r
->abdications
);
324 * Enqueue n items and maybe drain the ring for some time.
328 #ifdef NO_64BIT_ATOMICS
330 ifmp_ring_enqueue(struct ifmp_ring
*r
, void **items
, int n
, int budget
)
332 union ring_state os
, ns
;
333 uint16_t pidx_start
, pidx_stop
;
336 MPASS(items
!= NULL
);
341 * Reserve room for the new items. Our reservation, if successful, is
342 * from 'pidx_start' to 'pidx_stop'.
345 if (n
>= space_available(r
, os
)) {
346 counter_u64_add(r
->drops
, n
);
347 MPASS(os
.flags
!= IDLE
);
348 if (os
.flags
== STALLED
)
349 ifmp_ring_check_drainage(r
, 0);
353 ns
.pidx_head
= increment_idx(r
, os
.pidx_head
, n
);
355 pidx_start
= os
.pidx_head
;
356 pidx_stop
= ns
.pidx_head
;
359 * Wait for other producers who got in ahead of us to enqueue their
360 * items, one producer at a time. It is our turn when the ring's
361 * pidx_tail reaches the beginning of our reservation (pidx_start).
363 while (ns
.pidx_tail
!= pidx_start
) {
368 /* Now it is our turn to fill up the area we reserved earlier. */
371 r
->items
[i
] = *items
++;
372 if (__predict_false(++i
== r
->size
))
374 } while (i
!= pidx_stop
);
377 * Update the ring's pidx_tail. The release style atomic guarantees
378 * that the items are visible to any thread that sees the updated pidx.
380 os
.state
= ns
.state
= r
->state
;
381 ns
.pidx_tail
= pidx_stop
;
384 counter_u64_add(r
->enqueues
, n
);
387 * Turn into a consumer if some other thread isn't active as a consumer
390 if (os
.flags
!= BUSY
)
391 drain_ring_locked(r
, ns
, os
.flags
, budget
);
393 mtx_unlock(&r
->lock
);
399 ifmp_ring_enqueue(struct ifmp_ring
*r
, void **items
, int n
, int budget
)
401 union ring_state os
, ns
;
402 uint16_t pidx_start
, pidx_stop
;
405 MPASS(items
!= NULL
);
409 * Reserve room for the new items. Our reservation, if successful, is
410 * from 'pidx_start' to 'pidx_stop'.
414 if (n
>= space_available(r
, os
)) {
415 counter_u64_add(r
->drops
, n
);
416 MPASS(os
.flags
!= IDLE
);
417 if (os
.flags
== STALLED
)
418 ifmp_ring_check_drainage(r
, 0);
422 ns
.pidx_head
= increment_idx(r
, os
.pidx_head
, n
);
424 if (atomic_cmpset_64(&r
->state
, os
.state
, ns
.state
))
429 pidx_start
= os
.pidx_head
;
430 pidx_stop
= ns
.pidx_head
;
433 * Wait for other producers who got in ahead of us to enqueue their
434 * items, one producer at a time. It is our turn when the ring's
435 * pidx_tail reaches the beginning of our reservation (pidx_start).
437 while (ns
.pidx_tail
!= pidx_start
) {
442 /* Now it is our turn to fill up the area we reserved earlier. */
445 r
->items
[i
] = *items
++;
446 if (__predict_false(++i
== r
->size
))
448 } while (i
!= pidx_stop
);
451 * Update the ring's pidx_tail. The release style atomic guarantees
452 * that the items are visible to any thread that sees the updated pidx.
455 os
.state
= ns
.state
= r
->state
;
456 ns
.pidx_tail
= pidx_stop
;
458 } while (atomic_cmpset_rel_64(&r
->state
, os
.state
, ns
.state
) == 0);
460 counter_u64_add(r
->enqueues
, n
);
463 * Turn into a consumer if some other thread isn't active as a consumer
466 if (os
.flags
!= BUSY
)
467 drain_ring_lockless(r
, ns
, os
.flags
, budget
);
474 ifmp_ring_check_drainage(struct ifmp_ring
*r
, int budget
)
476 union ring_state os
, ns
;
479 if (os
.flags
!= STALLED
|| os
.pidx_head
!= os
.pidx_tail
|| r
->can_drain(r
) == 0)
482 MPASS(os
.cidx
!= os
.pidx_tail
); /* implied by STALLED */
487 #ifdef NO_64BIT_ATOMICS
489 if (r
->state
!= os
.state
) {
490 mtx_unlock(&r
->lock
);
494 drain_ring_locked(r
, ns
, os
.flags
, budget
);
495 mtx_unlock(&r
->lock
);
498 * The acquire style atomic guarantees visibility of items associated
499 * with the pidx that we read here.
501 if (!atomic_cmpset_acq_64(&r
->state
, os
.state
, ns
.state
))
505 drain_ring_lockless(r
, ns
, os
.flags
, budget
);
510 ifmp_ring_reset_stats(struct ifmp_ring
*r
)
513 counter_u64_zero(r
->enqueues
);
514 counter_u64_zero(r
->drops
);
515 counter_u64_zero(r
->starts
);
516 counter_u64_zero(r
->stalls
);
517 counter_u64_zero(r
->restarts
);
518 counter_u64_zero(r
->abdications
);
522 ifmp_ring_is_idle(struct ifmp_ring
*r
)
527 if (s
.pidx_head
== s
.pidx_tail
&& s
.pidx_tail
== s
.cidx
&&
535 ifmp_ring_is_stalled(struct ifmp_ring
*r
)
540 if (s
.pidx_head
== s
.pidx_tail
&& s
.flags
== STALLED
)