MiniDLNA update: 1.0.19.1 to 1.0.20
[tomato.git] / release / src / router / libevent / bufferevent_ratelim.c
blob85904e828bde3a85f089304df7042a640a269ddd
1 /*
2 * Copyright (c) 2007-2010 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4 * All rights reserved.
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 #include <sys/types.h>
30 #include <limits.h>
31 #include <string.h>
32 #include <stdlib.h>
34 #include "event2/event.h"
35 #include "event2/event_struct.h"
36 #include "event2/util.h"
37 #include "event2/bufferevent.h"
38 #include "event2/bufferevent_struct.h"
39 #include "event2/buffer.h"
41 #include "ratelim-internal.h"
43 #include "bufferevent-internal.h"
44 #include "mm-internal.h"
45 #include "util-internal.h"
47 int
48 ev_token_bucket_init(struct ev_token_bucket *bucket,
49 const struct ev_token_bucket_cfg *cfg,
50 ev_uint32_t current_tick,
51 int reinitialize)
53 if (reinitialize) {
54 /* on reinitialization, we only clip downwards, since we've
55 already used who-knows-how-much bandwidth this tick. We
56 leave "last_updated" as it is; the next update will add the
57 appropriate amount of bandwidth to the bucket.
59 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
60 bucket->read_limit = cfg->read_maximum;
61 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
62 bucket->write_limit = cfg->write_maximum;
63 } else {
64 bucket->read_limit = cfg->read_rate;
65 bucket->write_limit = cfg->write_rate;
66 bucket->last_updated = current_tick;
68 return 0;
71 int
72 ev_token_bucket_update(struct ev_token_bucket *bucket,
73 const struct ev_token_bucket_cfg *cfg,
74 ev_uint32_t current_tick)
76 /* It's okay if the tick number overflows, since we'll just
77 * wrap around when we do the unsigned substraction. */
78 unsigned n_ticks = current_tick - bucket->last_updated;
80 /* Make sure some ticks actually happened, and that time didn't
81 * roll back. */
82 if (n_ticks == 0 || n_ticks > INT_MAX)
83 return 0;
85 /* Naively, we would say
86 bucket->limit += n_ticks * cfg->rate;
88 if (bucket->limit > cfg->maximum)
89 bucket->limit = cfg->maximum;
91 But we're worried about overflow, so we do it like this:
94 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
95 bucket->read_limit = cfg->read_maximum;
96 else
97 bucket->read_limit += n_ticks * cfg->read_rate;
100 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
101 bucket->write_limit = cfg->write_maximum;
102 else
103 bucket->write_limit += n_ticks * cfg->write_rate;
106 bucket->last_updated = current_tick;
108 return 1;
111 static inline void
112 bufferevent_update_buckets(struct bufferevent_private *bev)
114 /* Must hold lock on bev. */
115 struct timeval now;
116 unsigned tick;
117 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
118 tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
119 if (tick != bev->rate_limiting->limit.last_updated)
120 ev_token_bucket_update(&bev->rate_limiting->limit,
121 bev->rate_limiting->cfg, tick);
124 ev_uint32_t
125 ev_token_bucket_get_tick(const struct timeval *tv,
126 const struct ev_token_bucket_cfg *cfg)
128 /* This computation uses two multiplies and a divide. We could do
129 * fewer if we knew that the tick length was an integer number of
130 * seconds, or if we knew it divided evenly into a second. We should
131 * investigate that more.
134 /* We cast to an ev_uint64_t first, since we don't want to overflow
135 * before we do the final divide. */
136 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
137 return (unsigned)(msec / cfg->msec_per_tick);
140 struct ev_token_bucket_cfg *
141 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
142 size_t write_rate, size_t write_burst,
143 const struct timeval *tick_len)
145 struct ev_token_bucket_cfg *r;
146 struct timeval g;
147 if (! tick_len) {
148 g.tv_sec = 1;
149 g.tv_usec = 0;
150 tick_len = &g;
152 if (read_rate > read_burst || write_rate > write_burst ||
153 read_rate < 1 || write_rate < 1)
154 return NULL;
155 if (read_rate > EV_RATE_LIMIT_MAX ||
156 write_rate > EV_RATE_LIMIT_MAX ||
157 read_burst > EV_RATE_LIMIT_MAX ||
158 write_burst > EV_RATE_LIMIT_MAX)
159 return NULL;
160 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
161 if (!r)
162 return NULL;
163 r->read_rate = read_rate;
164 r->write_rate = write_rate;
165 r->read_maximum = read_burst;
166 r->write_maximum = write_burst;
167 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
168 r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000;
169 return r;
172 void
173 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
175 mm_free(cfg);
178 /* No matter how big our bucket gets, don't try to read more than this
179 * much in a single read operation. */
180 #define MAX_TO_READ_EVER 16384
181 /* No matter how big our bucket gets, don't try to write more than this
182 * much in a single write operation. */
183 #define MAX_TO_WRITE_EVER 16384
185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
188 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
189 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
191 /** Helper: figure out the maximum amount we should write if is_write, or
192 the maximum amount we should read if is_read. Return that maximum, or
193 0 if our bucket is wholly exhausted.
195 static inline ev_ssize_t
196 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
198 /* needs lock on bev. */
199 ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
201 #define LIM(x) \
202 (is_write ? (x).write_limit : (x).read_limit)
204 #define GROUP_SUSPENDED(g) \
205 (is_write ? (g)->write_suspended : (g)->read_suspended)
207 /* Sets max_so_far to MIN(x, max_so_far) */
208 #define CLAMPTO(x) \
209 do { \
210 if (max_so_far > (x)) \
211 max_so_far = (x); \
212 } while (0);
214 if (!bev->rate_limiting)
215 return max_so_far;
217 /* If rate-limiting is enabled at all, update the appropriate
218 bucket, and take the smaller of our rate limit and the group
219 rate limit.
222 if (bev->rate_limiting->cfg) {
223 bufferevent_update_buckets(bev);
224 max_so_far = LIM(bev->rate_limiting->limit);
226 if (bev->rate_limiting->group) {
227 struct bufferevent_rate_limit_group *g =
228 bev->rate_limiting->group;
229 ev_ssize_t share;
230 LOCK_GROUP(g);
231 if (GROUP_SUSPENDED(g)) {
232 /* We can get here if we failed to lock this
233 * particular bufferevent while suspending the whole
234 * group. */
235 if (is_write)
236 bufferevent_suspend_write(&bev->bev,
237 BEV_SUSPEND_BW_GROUP);
238 else
239 bufferevent_suspend_read(&bev->bev,
240 BEV_SUSPEND_BW_GROUP);
241 share = 0;
242 } else {
243 /* XXXX probably we should divide among the active
244 * members, not the total members. */
245 share = LIM(g->rate_limit) / g->n_members;
246 if (share < g->min_share)
247 share = g->min_share;
249 UNLOCK_GROUP(g);
250 CLAMPTO(share);
253 if (max_so_far < 0)
254 max_so_far = 0;
255 return max_so_far;
258 ev_ssize_t
259 _bufferevent_get_read_max(struct bufferevent_private *bev)
261 return _bufferevent_get_rlim_max(bev, 0);
264 ev_ssize_t
265 _bufferevent_get_write_max(struct bufferevent_private *bev)
267 return _bufferevent_get_rlim_max(bev, 1);
271 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
273 /* XXXXX Make sure all users of this function check its return value */
274 int r = 0;
275 /* need to hold lock on bev */
276 if (!bev->rate_limiting)
277 return 0;
279 if (bev->rate_limiting->cfg) {
280 bev->rate_limiting->limit.read_limit -= bytes;
281 if (bev->rate_limiting->limit.read_limit <= 0) {
282 bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
283 if (event_add(&bev->rate_limiting->refill_bucket_event,
284 &bev->rate_limiting->cfg->tick_timeout) < 0)
285 r = -1;
289 if (bev->rate_limiting->group) {
290 LOCK_GROUP(bev->rate_limiting->group);
291 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
292 bev->rate_limiting->group->total_read += bytes;
293 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
294 _bev_group_suspend_reading(bev->rate_limiting->group);
296 UNLOCK_GROUP(bev->rate_limiting->group);
299 return r;
303 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
305 /* XXXXX Make sure all users of this function check its return value */
306 int r = 0;
307 /* need to hold lock */
308 if (!bev->rate_limiting)
309 return 0;
311 if (bev->rate_limiting->cfg) {
312 bev->rate_limiting->limit.write_limit -= bytes;
313 if (bev->rate_limiting->limit.write_limit <= 0) {
314 bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
315 if (event_add(&bev->rate_limiting->refill_bucket_event,
316 &bev->rate_limiting->cfg->tick_timeout) < 0)
317 r = -1;
321 if (bev->rate_limiting->group) {
322 LOCK_GROUP(bev->rate_limiting->group);
323 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
324 bev->rate_limiting->group->total_written += bytes;
325 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
326 _bev_group_suspend_writing(bev->rate_limiting->group);
328 UNLOCK_GROUP(bev->rate_limiting->group);
331 return r;
334 /** Stop reading on every bufferevent in <b>g</b> */
335 static int
336 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
338 /* Needs group lock */
339 struct bufferevent_private *bev;
340 g->read_suspended = 1;
341 g->pending_unsuspend_read = 0;
343 /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
344 to prevent a deadlock. (Ordinarily, the group lock nests inside
345 the bufferevent locks. If we are unable to lock any individual
346 bufferevent, it will find out later when it looks at its limit
347 and sees that its group is suspended.
349 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
350 if (EVLOCK_TRY_LOCK(bev->lock)) {
351 bufferevent_suspend_read(&bev->bev,
352 BEV_SUSPEND_BW_GROUP);
353 EVLOCK_UNLOCK(bev->lock, 0);
356 return 0;
359 /** Stop writing on every bufferevent in <b>g</b> */
360 static int
361 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
363 /* Needs group lock */
364 struct bufferevent_private *bev;
365 g->write_suspended = 1;
366 g->pending_unsuspend_write = 0;
367 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
368 if (EVLOCK_TRY_LOCK(bev->lock)) {
369 bufferevent_suspend_write(&bev->bev,
370 BEV_SUSPEND_BW_GROUP);
371 EVLOCK_UNLOCK(bev->lock, 0);
374 return 0;
377 /** Timer callback invoked on a single bufferevent with one or more exhausted
378 buckets when they are ready to refill. */
379 static void
380 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
382 unsigned tick;
383 struct timeval now;
384 struct bufferevent_private *bev = arg;
385 int again = 0;
386 BEV_LOCK(&bev->bev);
387 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
388 BEV_UNLOCK(&bev->bev);
389 return;
392 /* First, update the bucket */
393 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
394 tick = ev_token_bucket_get_tick(&now,
395 bev->rate_limiting->cfg);
396 ev_token_bucket_update(&bev->rate_limiting->limit,
397 bev->rate_limiting->cfg,
398 tick);
400 /* Now unsuspend any read/write operations as appropriate. */
401 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
402 if (bev->rate_limiting->limit.read_limit > 0)
403 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
404 else
405 again = 1;
407 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
408 if (bev->rate_limiting->limit.write_limit > 0)
409 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
410 else
411 again = 1;
413 if (again) {
414 /* One or more of the buckets may need another refill if they
415 started negative.
417 XXXX if we need to be quiet for more ticks, we should
418 maybe figure out what timeout we really want.
420 /* XXXX Handle event_add failure somehow */
421 event_add(&bev->rate_limiting->refill_bucket_event,
422 &bev->rate_limiting->cfg->tick_timeout);
424 BEV_UNLOCK(&bev->bev);
427 /** Helper: grab a random element from a bufferevent group. */
428 static struct bufferevent_private *
429 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
431 int which;
432 struct bufferevent_private *bev;
434 /* requires group lock */
436 if (!group->n_members)
437 return NULL;
439 EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
441 which = _evutil_weakrand() % group->n_members;
443 bev = TAILQ_FIRST(&group->members);
444 while (which--)
445 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
447 return bev;
450 /** Iterate over the elements of a rate-limiting group 'g' with a random
451 starting point, assigning each to the variable 'bev', and executing the
452 block 'block'.
454 We do this in a half-baked effort to get fairness among group members.
455 XXX Round-robin or some kind of priority queue would be even more fair.
457 #define FOREACH_RANDOM_ORDER(block) \
458 do { \
459 first = _bev_group_random_element(g); \
460 for (bev = first; bev != TAILQ_END(&g->members); \
461 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
462 block ; \
464 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
465 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
466 block ; \
468 } while (0)
470 static void
471 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
473 int again = 0;
474 struct bufferevent_private *bev, *first;
476 g->read_suspended = 0;
477 FOREACH_RANDOM_ORDER({
478 if (EVLOCK_TRY_LOCK(bev->lock)) {
479 bufferevent_unsuspend_read(&bev->bev,
480 BEV_SUSPEND_BW_GROUP);
481 EVLOCK_UNLOCK(bev->lock, 0);
482 } else {
483 again = 1;
486 g->pending_unsuspend_read = again;
489 static void
490 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
492 int again = 0;
493 struct bufferevent_private *bev, *first;
494 g->write_suspended = 0;
496 FOREACH_RANDOM_ORDER({
497 if (EVLOCK_TRY_LOCK(bev->lock)) {
498 bufferevent_unsuspend_write(&bev->bev,
499 BEV_SUSPEND_BW_GROUP);
500 EVLOCK_UNLOCK(bev->lock, 0);
501 } else {
502 again = 1;
505 g->pending_unsuspend_write = again;
508 /** Callback invoked every tick to add more elements to the group bucket
509 and unsuspend group members as needed.
511 static void
512 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
514 struct bufferevent_rate_limit_group *g = arg;
515 unsigned tick;
516 struct timeval now;
518 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
520 LOCK_GROUP(g);
521 tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
522 ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
524 if (g->pending_unsuspend_read ||
525 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
526 _bev_group_unsuspend_reading(g);
528 if (g->pending_unsuspend_write ||
529 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
530 _bev_group_unsuspend_writing(g);
533 /* XXXX Rather than waiting to the next tick to unsuspend stuff
534 * with pending_unsuspend_write/read, we should do it on the
535 * next iteration of the mainloop.
538 UNLOCK_GROUP(g);
542 bufferevent_set_rate_limit(struct bufferevent *bev,
543 struct ev_token_bucket_cfg *cfg)
545 struct bufferevent_private *bevp =
546 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
547 int r = -1;
548 struct bufferevent_rate_limit *rlim;
549 struct timeval now;
550 ev_uint32_t tick;
551 int reinit = 0, suspended = 0;
552 /* XXX reference-count cfg */
554 BEV_LOCK(bev);
556 if (cfg == NULL) {
557 if (bevp->rate_limiting) {
558 rlim = bevp->rate_limiting;
559 rlim->cfg = NULL;
560 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
561 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
562 if (event_initialized(&rlim->refill_bucket_event))
563 event_del(&rlim->refill_bucket_event);
565 r = 0;
566 goto done;
569 event_base_gettimeofday_cached(bev->ev_base, &now);
570 tick = ev_token_bucket_get_tick(&now, cfg);
572 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
573 /* no-op */
574 r = 0;
575 goto done;
577 if (bevp->rate_limiting == NULL) {
578 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
579 if (!rlim)
580 goto done;
581 bevp->rate_limiting = rlim;
582 } else {
583 rlim = bevp->rate_limiting;
585 reinit = rlim->cfg != NULL;
587 rlim->cfg = cfg;
588 ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
590 if (reinit) {
591 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
592 event_del(&rlim->refill_bucket_event);
594 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
595 _bev_refill_callback, bevp);
597 if (rlim->limit.read_limit > 0) {
598 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
599 } else {
600 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
601 suspended=1;
603 if (rlim->limit.write_limit > 0) {
604 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
605 } else {
606 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
607 suspended = 1;
610 if (suspended)
611 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
613 r = 0;
615 done:
616 BEV_UNLOCK(bev);
617 return r;
620 struct bufferevent_rate_limit_group *
621 bufferevent_rate_limit_group_new(struct event_base *base,
622 const struct ev_token_bucket_cfg *cfg)
624 struct bufferevent_rate_limit_group *g;
625 struct timeval now;
626 ev_uint32_t tick;
628 event_base_gettimeofday_cached(base, &now);
629 tick = ev_token_bucket_get_tick(&now, cfg);
631 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
632 if (!g)
633 return NULL;
634 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
635 TAILQ_INIT(&g->members);
637 ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
639 g->min_share = 64;
640 event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
641 _bev_group_refill_callback, g);
642 /*XXXX handle event_add failure */
643 event_add(&g->master_refill_event, &cfg->tick_timeout);
645 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
646 return g;
650 bufferevent_rate_limit_group_set_cfg(
651 struct bufferevent_rate_limit_group *g,
652 const struct ev_token_bucket_cfg *cfg)
654 int same_tick;
655 if (!g || !cfg)
656 return -1;
658 LOCK_GROUP(g);
659 same_tick = evutil_timercmp(
660 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
661 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
663 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
664 g->rate_limit.read_limit = cfg->read_maximum;
665 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
666 g->rate_limit.write_limit = cfg->write_maximum;
668 if (!same_tick) {
669 /* This can cause a hiccup in the schedule */
670 event_add(&g->master_refill_event, &cfg->tick_timeout);
673 UNLOCK_GROUP(g);
674 return 0;
678 bufferevent_rate_limit_group_set_min_share(
679 struct bufferevent_rate_limit_group *g,
680 size_t share)
682 if (share > EV_SSIZE_MAX)
683 return -1;
685 g->min_share = share;
686 return 0;
689 void
690 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
692 LOCK_GROUP(g);
693 EVUTIL_ASSERT(0 == g->n_members);
694 event_del(&g->master_refill_event);
695 UNLOCK_GROUP(g);
696 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
697 mm_free(g);
701 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
702 struct bufferevent_rate_limit_group *g)
704 int wsuspend, rsuspend;
705 struct bufferevent_private *bevp =
706 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
707 BEV_LOCK(bev);
709 if (!bevp->rate_limiting) {
710 struct bufferevent_rate_limit *rlim;
711 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
712 if (!rlim) {
713 BEV_UNLOCK(bev);
714 return -1;
716 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
717 _bev_refill_callback, bevp);
718 bevp->rate_limiting = rlim;
721 if (bevp->rate_limiting->group == g) {
722 BEV_UNLOCK(bev);
723 return 0;
725 if (bevp->rate_limiting->group)
726 bufferevent_remove_from_rate_limit_group(bev);
728 LOCK_GROUP(g);
729 bevp->rate_limiting->group = g;
730 ++g->n_members;
731 TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
733 rsuspend = g->read_suspended;
734 wsuspend = g->write_suspended;
736 UNLOCK_GROUP(g);
738 if (rsuspend)
739 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
740 if (wsuspend)
741 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
743 BEV_UNLOCK(bev);
744 return 0;
748 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
750 return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
754 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
755 int unsuspend)
757 struct bufferevent_private *bevp =
758 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
759 BEV_LOCK(bev);
760 if (bevp->rate_limiting && bevp->rate_limiting->group) {
761 struct bufferevent_rate_limit_group *g =
762 bevp->rate_limiting->group;
763 LOCK_GROUP(g);
764 bevp->rate_limiting->group = NULL;
765 --g->n_members;
766 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
767 UNLOCK_GROUP(g);
769 if (unsuspend) {
770 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
771 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
773 BEV_UNLOCK(bev);
774 return 0;
777 /* ===
778 * API functions to expose rate limits.
780 * Don't use these from inside Libevent; they're meant to be for use by
781 * the program.
782 * === */
784 /* Mostly you don't want to use this function from inside libevent;
785 * _bufferevent_get_read_max() is more likely what you want*/
786 ev_ssize_t
787 bufferevent_get_read_limit(struct bufferevent *bev)
789 ev_ssize_t r;
790 struct bufferevent_private *bevp;
791 BEV_LOCK(bev);
792 bevp = BEV_UPCAST(bev);
793 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
794 bufferevent_update_buckets(bevp);
795 r = bevp->rate_limiting->limit.read_limit;
796 } else {
797 r = EV_SSIZE_MAX;
799 BEV_UNLOCK(bev);
800 return r;
803 /* Mostly you don't want to use this function from inside libevent;
804 * _bufferevent_get_write_max() is more likely what you want*/
805 ev_ssize_t
806 bufferevent_get_write_limit(struct bufferevent *bev)
808 ev_ssize_t r;
809 struct bufferevent_private *bevp;
810 BEV_LOCK(bev);
811 bevp = BEV_UPCAST(bev);
812 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
813 bufferevent_update_buckets(bevp);
814 r = bevp->rate_limiting->limit.write_limit;
815 } else {
816 r = EV_SSIZE_MAX;
818 BEV_UNLOCK(bev);
819 return r;
822 ev_ssize_t
823 bufferevent_get_max_to_read(struct bufferevent *bev)
825 ev_ssize_t r;
826 BEV_LOCK(bev);
827 r = _bufferevent_get_read_max(BEV_UPCAST(bev));
828 BEV_UNLOCK(bev);
829 return r;
832 ev_ssize_t
833 bufferevent_get_max_to_write(struct bufferevent *bev)
835 ev_ssize_t r;
836 BEV_LOCK(bev);
837 r = _bufferevent_get_write_max(BEV_UPCAST(bev));
838 BEV_UNLOCK(bev);
839 return r;
843 /* Mostly you don't want to use this function from inside libevent;
844 * _bufferevent_get_read_max() is more likely what you want*/
845 ev_ssize_t
846 bufferevent_rate_limit_group_get_read_limit(
847 struct bufferevent_rate_limit_group *grp)
849 ev_ssize_t r;
850 LOCK_GROUP(grp);
851 r = grp->rate_limit.read_limit;
852 UNLOCK_GROUP(grp);
853 return r;
856 /* Mostly you don't want to use this function from inside libevent;
857 * _bufferevent_get_write_max() is more likely what you want. */
858 ev_ssize_t
859 bufferevent_rate_limit_group_get_write_limit(
860 struct bufferevent_rate_limit_group *grp)
862 ev_ssize_t r;
863 LOCK_GROUP(grp);
864 r = grp->rate_limit.write_limit;
865 UNLOCK_GROUP(grp);
866 return r;
870 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
872 int r = 0;
873 ev_ssize_t old_limit, new_limit;
874 struct bufferevent_private *bevp;
875 BEV_LOCK(bev);
876 bevp = BEV_UPCAST(bev);
877 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
878 old_limit = bevp->rate_limiting->limit.read_limit;
880 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
881 if (old_limit > 0 && new_limit <= 0) {
882 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
883 if (event_add(&bevp->rate_limiting->refill_bucket_event,
884 &bevp->rate_limiting->cfg->tick_timeout) < 0)
885 r = -1;
886 } else if (old_limit <= 0 && new_limit > 0) {
887 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
888 event_del(&bevp->rate_limiting->refill_bucket_event);
889 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
892 BEV_UNLOCK(bev);
893 return r;
897 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
899 /* XXXX this is mostly copy-and-paste from
900 * bufferevent_decrement_read_limit */
901 int r = 0;
902 ev_ssize_t old_limit, new_limit;
903 struct bufferevent_private *bevp;
904 BEV_LOCK(bev);
905 bevp = BEV_UPCAST(bev);
906 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
907 old_limit = bevp->rate_limiting->limit.write_limit;
909 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
910 if (old_limit > 0 && new_limit <= 0) {
911 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
912 if (event_add(&bevp->rate_limiting->refill_bucket_event,
913 &bevp->rate_limiting->cfg->tick_timeout) < 0)
914 r = -1;
915 } else if (old_limit <= 0 && new_limit > 0) {
916 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
917 event_del(&bevp->rate_limiting->refill_bucket_event);
918 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
921 BEV_UNLOCK(bev);
922 return r;
926 bufferevent_rate_limit_group_decrement_read(
927 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
929 int r = 0;
930 ev_ssize_t old_limit, new_limit;
931 LOCK_GROUP(grp);
932 old_limit = grp->rate_limit.read_limit;
933 new_limit = (grp->rate_limit.read_limit -= decr);
935 if (old_limit > 0 && new_limit <= 0) {
936 _bev_group_suspend_reading(grp);
937 } else if (old_limit <= 0 && new_limit > 0) {
938 _bev_group_unsuspend_reading(grp);
941 UNLOCK_GROUP(grp);
942 return r;
946 bufferevent_rate_limit_group_decrement_write(
947 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
949 int r = 0;
950 ev_ssize_t old_limit, new_limit;
951 LOCK_GROUP(grp);
952 old_limit = grp->rate_limit.write_limit;
953 new_limit = (grp->rate_limit.write_limit -= decr);
955 if (old_limit > 0 && new_limit <= 0) {
956 _bev_group_suspend_writing(grp);
957 } else if (old_limit <= 0 && new_limit > 0) {
958 _bev_group_unsuspend_writing(grp);
961 UNLOCK_GROUP(grp);
962 return r;
965 void
966 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
967 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
969 EVUTIL_ASSERT(grp != NULL);
970 if (total_read_out)
971 *total_read_out = grp->total_read;
972 if (total_written_out)
973 *total_written_out = grp->total_written;
976 void
977 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
979 grp->total_read = grp->total_written = 0;