Fixed issue with cmake version generation
[libevent.git] / bufferevent_ratelim.c
blobbde192021be14ad102011c0912ed0be8a695d04b
1 /*
2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4 * All rights reserved.
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 #include "evconfig-private.h"
30 #include <sys/types.h>
31 #include <limits.h>
32 #include <string.h>
33 #include <stdlib.h>
35 #include "event2/event.h"
36 #include "event2/event_struct.h"
37 #include "event2/util.h"
38 #include "event2/bufferevent.h"
39 #include "event2/bufferevent_struct.h"
40 #include "event2/buffer.h"
42 #include "ratelim-internal.h"
44 #include "bufferevent-internal.h"
45 #include "mm-internal.h"
46 #include "util-internal.h"
47 #include "event-internal.h"
49 int
50 ev_token_bucket_init_(struct ev_token_bucket *bucket,
51 const struct ev_token_bucket_cfg *cfg,
52 ev_uint32_t current_tick,
53 int reinitialize)
55 if (reinitialize) {
56 /* on reinitialization, we only clip downwards, since we've
57 already used who-knows-how-much bandwidth this tick. We
58 leave "last_updated" as it is; the next update will add the
59 appropriate amount of bandwidth to the bucket.
61 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62 bucket->read_limit = cfg->read_maximum;
63 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64 bucket->write_limit = cfg->write_maximum;
65 } else {
66 bucket->read_limit = cfg->read_rate;
67 bucket->write_limit = cfg->write_rate;
68 bucket->last_updated = current_tick;
70 return 0;
73 int
74 ev_token_bucket_update_(struct ev_token_bucket *bucket,
75 const struct ev_token_bucket_cfg *cfg,
76 ev_uint32_t current_tick)
78 /* It's okay if the tick number overflows, since we'll just
79 * wrap around when we do the unsigned substraction. */
80 unsigned n_ticks = current_tick - bucket->last_updated;
82 /* Make sure some ticks actually happened, and that time didn't
83 * roll back. */
84 if (n_ticks == 0 || n_ticks > INT_MAX)
85 return 0;
87 /* Naively, we would say
88 bucket->limit += n_ticks * cfg->rate;
90 if (bucket->limit > cfg->maximum)
91 bucket->limit = cfg->maximum;
93 But we're worried about overflow, so we do it like this:
96 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97 bucket->read_limit = cfg->read_maximum;
98 else
99 bucket->read_limit += n_ticks * cfg->read_rate;
102 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103 bucket->write_limit = cfg->write_maximum;
104 else
105 bucket->write_limit += n_ticks * cfg->write_rate;
108 bucket->last_updated = current_tick;
110 return 1;
113 static inline void
114 bufferevent_update_buckets(struct bufferevent_private *bev)
116 /* Must hold lock on bev. */
117 struct timeval now;
118 unsigned tick;
119 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120 tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121 if (tick != bev->rate_limiting->limit.last_updated)
122 ev_token_bucket_update_(&bev->rate_limiting->limit,
123 bev->rate_limiting->cfg, tick);
126 ev_uint32_t
127 ev_token_bucket_get_tick_(const struct timeval *tv,
128 const struct ev_token_bucket_cfg *cfg)
130 /* This computation uses two multiplies and a divide. We could do
131 * fewer if we knew that the tick length was an integer number of
132 * seconds, or if we knew it divided evenly into a second. We should
133 * investigate that more.
136 /* We cast to an ev_uint64_t first, since we don't want to overflow
137 * before we do the final divide. */
138 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139 return (unsigned)(msec / cfg->msec_per_tick);
142 struct ev_token_bucket_cfg *
143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144 size_t write_rate, size_t write_burst,
145 const struct timeval *tick_len)
147 struct ev_token_bucket_cfg *r;
148 struct timeval g;
149 if (! tick_len) {
150 g.tv_sec = 1;
151 g.tv_usec = 0;
152 tick_len = &g;
154 if (read_rate > read_burst || write_rate > write_burst ||
155 read_rate < 1 || write_rate < 1)
156 return NULL;
157 if (read_rate > EV_RATE_LIMIT_MAX ||
158 write_rate > EV_RATE_LIMIT_MAX ||
159 read_burst > EV_RATE_LIMIT_MAX ||
160 write_burst > EV_RATE_LIMIT_MAX)
161 return NULL;
162 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163 if (!r)
164 return NULL;
165 r->read_rate = read_rate;
166 r->write_rate = write_rate;
167 r->read_maximum = read_burst;
168 r->write_maximum = write_burst;
169 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170 r->msec_per_tick = (tick_len->tv_sec * 1000) +
171 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172 return r;
175 void
176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
178 mm_free(cfg);
181 /* Default values for max_single_read & max_single_write variables. */
182 #define MAX_SINGLE_READ_DEFAULT 16384
183 #define MAX_SINGLE_WRITE_DEFAULT 16384
185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
188 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
193 /** Helper: figure out the maximum amount we should write if is_write, or
194 the maximum amount we should read if is_read. Return that maximum, or
195 0 if our bucket is wholly exhausted.
197 static inline ev_ssize_t
198 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
200 /* needs lock on bev. */
201 ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
203 #define LIM(x) \
204 (is_write ? (x).write_limit : (x).read_limit)
206 #define GROUP_SUSPENDED(g) \
207 (is_write ? (g)->write_suspended : (g)->read_suspended)
209 /* Sets max_so_far to MIN(x, max_so_far) */
210 #define CLAMPTO(x) \
211 do { \
212 if (max_so_far > (x)) \
213 max_so_far = (x); \
214 } while (0);
216 if (!bev->rate_limiting)
217 return max_so_far;
219 /* If rate-limiting is enabled at all, update the appropriate
220 bucket, and take the smaller of our rate limit and the group
221 rate limit.
224 if (bev->rate_limiting->cfg) {
225 bufferevent_update_buckets(bev);
226 max_so_far = LIM(bev->rate_limiting->limit);
228 if (bev->rate_limiting->group) {
229 struct bufferevent_rate_limit_group *g =
230 bev->rate_limiting->group;
231 ev_ssize_t share;
232 LOCK_GROUP(g);
233 if (GROUP_SUSPENDED(g)) {
234 /* We can get here if we failed to lock this
235 * particular bufferevent while suspending the whole
236 * group. */
237 if (is_write)
238 bufferevent_suspend_write_(&bev->bev,
239 BEV_SUSPEND_BW_GROUP);
240 else
241 bufferevent_suspend_read_(&bev->bev,
242 BEV_SUSPEND_BW_GROUP);
243 share = 0;
244 } else {
245 /* XXXX probably we should divide among the active
246 * members, not the total members. */
247 share = LIM(g->rate_limit) / g->n_members;
248 if (share < g->min_share)
249 share = g->min_share;
251 UNLOCK_GROUP(g);
252 CLAMPTO(share);
255 if (max_so_far < 0)
256 max_so_far = 0;
257 return max_so_far;
260 ev_ssize_t
261 bufferevent_get_read_max_(struct bufferevent_private *bev)
263 return bufferevent_get_rlim_max_(bev, 0);
266 ev_ssize_t
267 bufferevent_get_write_max_(struct bufferevent_private *bev)
269 return bufferevent_get_rlim_max_(bev, 1);
273 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
275 /* XXXXX Make sure all users of this function check its return value */
276 int r = 0;
277 /* need to hold lock on bev */
278 if (!bev->rate_limiting)
279 return 0;
281 if (bev->rate_limiting->cfg) {
282 bev->rate_limiting->limit.read_limit -= bytes;
283 if (bev->rate_limiting->limit.read_limit <= 0) {
284 bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285 if (event_add(&bev->rate_limiting->refill_bucket_event,
286 &bev->rate_limiting->cfg->tick_timeout) < 0)
287 r = -1;
288 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
289 if (!(bev->write_suspended & BEV_SUSPEND_BW))
290 event_del(&bev->rate_limiting->refill_bucket_event);
291 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
295 if (bev->rate_limiting->group) {
296 LOCK_GROUP(bev->rate_limiting->group);
297 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298 bev->rate_limiting->group->total_read += bytes;
299 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300 bev_group_suspend_reading_(bev->rate_limiting->group);
301 } else if (bev->rate_limiting->group->read_suspended) {
302 bev_group_unsuspend_reading_(bev->rate_limiting->group);
304 UNLOCK_GROUP(bev->rate_limiting->group);
307 return r;
311 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
313 /* XXXXX Make sure all users of this function check its return value */
314 int r = 0;
315 /* need to hold lock */
316 if (!bev->rate_limiting)
317 return 0;
319 if (bev->rate_limiting->cfg) {
320 bev->rate_limiting->limit.write_limit -= bytes;
321 if (bev->rate_limiting->limit.write_limit <= 0) {
322 bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323 if (event_add(&bev->rate_limiting->refill_bucket_event,
324 &bev->rate_limiting->cfg->tick_timeout) < 0)
325 r = -1;
326 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
327 if (!(bev->read_suspended & BEV_SUSPEND_BW))
328 event_del(&bev->rate_limiting->refill_bucket_event);
329 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
333 if (bev->rate_limiting->group) {
334 LOCK_GROUP(bev->rate_limiting->group);
335 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336 bev->rate_limiting->group->total_written += bytes;
337 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338 bev_group_suspend_writing_(bev->rate_limiting->group);
339 } else if (bev->rate_limiting->group->write_suspended) {
340 bev_group_unsuspend_writing_(bev->rate_limiting->group);
342 UNLOCK_GROUP(bev->rate_limiting->group);
345 return r;
348 /** Stop reading on every bufferevent in <b>g</b> */
349 static int
350 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
352 /* Needs group lock */
353 struct bufferevent_private *bev;
354 g->read_suspended = 1;
355 g->pending_unsuspend_read = 0;
357 /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358 to prevent a deadlock. (Ordinarily, the group lock nests inside
359 the bufferevent locks. If we are unable to lock any individual
360 bufferevent, it will find out later when it looks at its limit
361 and sees that its group is suspended.)
363 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364 if (EVLOCK_TRY_LOCK_(bev->lock)) {
365 bufferevent_suspend_read_(&bev->bev,
366 BEV_SUSPEND_BW_GROUP);
367 EVLOCK_UNLOCK(bev->lock, 0);
370 return 0;
373 /** Stop writing on every bufferevent in <b>g</b> */
374 static int
375 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
377 /* Needs group lock */
378 struct bufferevent_private *bev;
379 g->write_suspended = 1;
380 g->pending_unsuspend_write = 0;
381 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382 if (EVLOCK_TRY_LOCK_(bev->lock)) {
383 bufferevent_suspend_write_(&bev->bev,
384 BEV_SUSPEND_BW_GROUP);
385 EVLOCK_UNLOCK(bev->lock, 0);
388 return 0;
391 /** Timer callback invoked on a single bufferevent with one or more exhausted
392 buckets when they are ready to refill. */
393 static void
394 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
396 unsigned tick;
397 struct timeval now;
398 struct bufferevent_private *bev = arg;
399 int again = 0;
400 BEV_LOCK(&bev->bev);
401 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402 BEV_UNLOCK(&bev->bev);
403 return;
406 /* First, update the bucket */
407 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408 tick = ev_token_bucket_get_tick_(&now,
409 bev->rate_limiting->cfg);
410 ev_token_bucket_update_(&bev->rate_limiting->limit,
411 bev->rate_limiting->cfg,
412 tick);
414 /* Now unsuspend any read/write operations as appropriate. */
415 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416 if (bev->rate_limiting->limit.read_limit > 0)
417 bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418 else
419 again = 1;
421 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422 if (bev->rate_limiting->limit.write_limit > 0)
423 bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424 else
425 again = 1;
427 if (again) {
428 /* One or more of the buckets may need another refill if they
429 started negative.
431 XXXX if we need to be quiet for more ticks, we should
432 maybe figure out what timeout we really want.
434 /* XXXX Handle event_add failure somehow */
435 event_add(&bev->rate_limiting->refill_bucket_event,
436 &bev->rate_limiting->cfg->tick_timeout);
438 BEV_UNLOCK(&bev->bev);
441 /** Helper: grab a random element from a bufferevent group.
443 * Requires that we hold the lock on the group.
445 static struct bufferevent_private *
446 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
448 int which;
449 struct bufferevent_private *bev;
451 /* requires group lock */
453 if (!group->n_members)
454 return NULL;
456 EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
458 which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
460 bev = LIST_FIRST(&group->members);
461 while (which--)
462 bev = LIST_NEXT(bev, rate_limiting->next_in_group);
464 return bev;
467 /** Iterate over the elements of a rate-limiting group 'g' with a random
468 starting point, assigning each to the variable 'bev', and executing the
469 block 'block'.
471 We do this in a half-baked effort to get fairness among group members.
472 XXX Round-robin or some kind of priority queue would be even more fair.
474 #define FOREACH_RANDOM_ORDER(block) \
475 do { \
476 first = bev_group_random_element_(g); \
477 for (bev = first; bev != LIST_END(&g->members); \
478 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479 block ; \
481 for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482 bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483 block ; \
485 } while (0)
487 static void
488 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
490 int again = 0;
491 struct bufferevent_private *bev, *first;
493 g->read_suspended = 0;
494 FOREACH_RANDOM_ORDER({
495 if (EVLOCK_TRY_LOCK_(bev->lock)) {
496 bufferevent_unsuspend_read_(&bev->bev,
497 BEV_SUSPEND_BW_GROUP);
498 EVLOCK_UNLOCK(bev->lock, 0);
499 } else {
500 again = 1;
503 g->pending_unsuspend_read = again;
506 static void
507 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
509 int again = 0;
510 struct bufferevent_private *bev, *first;
511 g->write_suspended = 0;
513 FOREACH_RANDOM_ORDER({
514 if (EVLOCK_TRY_LOCK_(bev->lock)) {
515 bufferevent_unsuspend_write_(&bev->bev,
516 BEV_SUSPEND_BW_GROUP);
517 EVLOCK_UNLOCK(bev->lock, 0);
518 } else {
519 again = 1;
522 g->pending_unsuspend_write = again;
525 /** Callback invoked every tick to add more elements to the group bucket
526 and unsuspend group members as needed.
528 static void
529 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
531 struct bufferevent_rate_limit_group *g = arg;
532 unsigned tick;
533 struct timeval now;
535 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
537 LOCK_GROUP(g);
539 tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540 ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
542 if (g->pending_unsuspend_read ||
543 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544 bev_group_unsuspend_reading_(g);
546 if (g->pending_unsuspend_write ||
547 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548 bev_group_unsuspend_writing_(g);
551 /* XXXX Rather than waiting to the next tick to unsuspend stuff
552 * with pending_unsuspend_write/read, we should do it on the
553 * next iteration of the mainloop.
556 UNLOCK_GROUP(g);
560 bufferevent_set_rate_limit(struct bufferevent *bev,
561 struct ev_token_bucket_cfg *cfg)
563 struct bufferevent_private *bevp =
564 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
565 int r = -1;
566 struct bufferevent_rate_limit *rlim;
567 struct timeval now;
568 ev_uint32_t tick;
569 int reinit = 0, suspended = 0;
570 /* XXX reference-count cfg */
572 BEV_LOCK(bev);
574 if (cfg == NULL) {
575 if (bevp->rate_limiting) {
576 rlim = bevp->rate_limiting;
577 rlim->cfg = NULL;
578 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
579 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
580 if (event_initialized(&rlim->refill_bucket_event))
581 event_del(&rlim->refill_bucket_event);
583 r = 0;
584 goto done;
587 event_base_gettimeofday_cached(bev->ev_base, &now);
588 tick = ev_token_bucket_get_tick_(&now, cfg);
590 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
591 /* no-op */
592 r = 0;
593 goto done;
595 if (bevp->rate_limiting == NULL) {
596 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
597 if (!rlim)
598 goto done;
599 bevp->rate_limiting = rlim;
600 } else {
601 rlim = bevp->rate_limiting;
603 reinit = rlim->cfg != NULL;
605 rlim->cfg = cfg;
606 ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
608 if (reinit) {
609 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
610 event_del(&rlim->refill_bucket_event);
612 event_assign(&rlim->refill_bucket_event, bev->ev_base,
613 -1, EV_FINALIZE, bev_refill_callback_, bevp);
615 if (rlim->limit.read_limit > 0) {
616 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
617 } else {
618 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
619 suspended=1;
621 if (rlim->limit.write_limit > 0) {
622 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
623 } else {
624 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
625 suspended = 1;
628 if (suspended)
629 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
631 r = 0;
633 done:
634 BEV_UNLOCK(bev);
635 return r;
638 struct bufferevent_rate_limit_group *
639 bufferevent_rate_limit_group_new(struct event_base *base,
640 const struct ev_token_bucket_cfg *cfg)
642 struct bufferevent_rate_limit_group *g;
643 struct timeval now;
644 ev_uint32_t tick;
646 event_base_gettimeofday_cached(base, &now);
647 tick = ev_token_bucket_get_tick_(&now, cfg);
649 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
650 if (!g)
651 return NULL;
652 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
653 LIST_INIT(&g->members);
655 ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
657 event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
658 bev_group_refill_callback_, g);
659 /*XXXX handle event_add failure */
660 event_add(&g->master_refill_event, &cfg->tick_timeout);
662 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
664 bufferevent_rate_limit_group_set_min_share(g, 64);
666 evutil_weakrand_seed_(&g->weakrand_seed,
667 (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
669 return g;
673 bufferevent_rate_limit_group_set_cfg(
674 struct bufferevent_rate_limit_group *g,
675 const struct ev_token_bucket_cfg *cfg)
677 int same_tick;
678 if (!g || !cfg)
679 return -1;
681 LOCK_GROUP(g);
682 same_tick = evutil_timercmp(
683 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
684 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
686 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
687 g->rate_limit.read_limit = cfg->read_maximum;
688 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
689 g->rate_limit.write_limit = cfg->write_maximum;
691 if (!same_tick) {
692 /* This can cause a hiccup in the schedule */
693 event_add(&g->master_refill_event, &cfg->tick_timeout);
696 /* The new limits might force us to adjust min_share differently. */
697 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
699 UNLOCK_GROUP(g);
700 return 0;
704 bufferevent_rate_limit_group_set_min_share(
705 struct bufferevent_rate_limit_group *g,
706 size_t share)
708 if (share > EV_SSIZE_MAX)
709 return -1;
711 g->configured_min_share = share;
713 /* Can't set share to less than the one-tick maximum. IOW, at steady
714 * state, at least one connection can go per tick. */
715 if (share > g->rate_limit_cfg.read_rate)
716 share = g->rate_limit_cfg.read_rate;
717 if (share > g->rate_limit_cfg.write_rate)
718 share = g->rate_limit_cfg.write_rate;
720 g->min_share = share;
721 return 0;
724 void
725 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
727 LOCK_GROUP(g);
728 EVUTIL_ASSERT(0 == g->n_members);
729 event_del(&g->master_refill_event);
730 UNLOCK_GROUP(g);
731 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
732 mm_free(g);
736 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
737 struct bufferevent_rate_limit_group *g)
739 int wsuspend, rsuspend;
740 struct bufferevent_private *bevp =
741 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
742 BEV_LOCK(bev);
744 if (!bevp->rate_limiting) {
745 struct bufferevent_rate_limit *rlim;
746 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747 if (!rlim) {
748 BEV_UNLOCK(bev);
749 return -1;
751 event_assign(&rlim->refill_bucket_event, bev->ev_base,
752 -1, EV_FINALIZE, bev_refill_callback_, bevp);
753 bevp->rate_limiting = rlim;
756 if (bevp->rate_limiting->group == g) {
757 BEV_UNLOCK(bev);
758 return 0;
760 if (bevp->rate_limiting->group)
761 bufferevent_remove_from_rate_limit_group(bev);
763 LOCK_GROUP(g);
764 bevp->rate_limiting->group = g;
765 ++g->n_members;
766 LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
768 rsuspend = g->read_suspended;
769 wsuspend = g->write_suspended;
771 UNLOCK_GROUP(g);
773 if (rsuspend)
774 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775 if (wsuspend)
776 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
778 BEV_UNLOCK(bev);
779 return 0;
783 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
785 return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
789 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790 int unsuspend)
792 struct bufferevent_private *bevp =
793 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
794 BEV_LOCK(bev);
795 if (bevp->rate_limiting && bevp->rate_limiting->group) {
796 struct bufferevent_rate_limit_group *g =
797 bevp->rate_limiting->group;
798 LOCK_GROUP(g);
799 bevp->rate_limiting->group = NULL;
800 --g->n_members;
801 LIST_REMOVE(bevp, rate_limiting->next_in_group);
802 UNLOCK_GROUP(g);
804 if (unsuspend) {
805 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
806 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
808 BEV_UNLOCK(bev);
809 return 0;
812 /* ===
813 * API functions to expose rate limits.
815 * Don't use these from inside Libevent; they're meant to be for use by
816 * the program.
817 * === */
819 /* Mostly you don't want to use this function from inside libevent;
820 * bufferevent_get_read_max_() is more likely what you want*/
821 ev_ssize_t
822 bufferevent_get_read_limit(struct bufferevent *bev)
824 ev_ssize_t r;
825 struct bufferevent_private *bevp;
826 BEV_LOCK(bev);
827 bevp = BEV_UPCAST(bev);
828 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
829 bufferevent_update_buckets(bevp);
830 r = bevp->rate_limiting->limit.read_limit;
831 } else {
832 r = EV_SSIZE_MAX;
834 BEV_UNLOCK(bev);
835 return r;
838 /* Mostly you don't want to use this function from inside libevent;
839 * bufferevent_get_write_max_() is more likely what you want*/
840 ev_ssize_t
841 bufferevent_get_write_limit(struct bufferevent *bev)
843 ev_ssize_t r;
844 struct bufferevent_private *bevp;
845 BEV_LOCK(bev);
846 bevp = BEV_UPCAST(bev);
847 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
848 bufferevent_update_buckets(bevp);
849 r = bevp->rate_limiting->limit.write_limit;
850 } else {
851 r = EV_SSIZE_MAX;
853 BEV_UNLOCK(bev);
854 return r;
858 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
860 struct bufferevent_private *bevp;
861 BEV_LOCK(bev);
862 bevp = BEV_UPCAST(bev);
863 if (size == 0 || size > EV_SSIZE_MAX)
864 bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
865 else
866 bevp->max_single_read = size;
867 BEV_UNLOCK(bev);
868 return 0;
872 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
874 struct bufferevent_private *bevp;
875 BEV_LOCK(bev);
876 bevp = BEV_UPCAST(bev);
877 if (size == 0 || size > EV_SSIZE_MAX)
878 bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
879 else
880 bevp->max_single_write = size;
881 BEV_UNLOCK(bev);
882 return 0;
885 ev_ssize_t
886 bufferevent_get_max_single_read(struct bufferevent *bev)
888 ev_ssize_t r;
890 BEV_LOCK(bev);
891 r = BEV_UPCAST(bev)->max_single_read;
892 BEV_UNLOCK(bev);
893 return r;
896 ev_ssize_t
897 bufferevent_get_max_single_write(struct bufferevent *bev)
899 ev_ssize_t r;
901 BEV_LOCK(bev);
902 r = BEV_UPCAST(bev)->max_single_write;
903 BEV_UNLOCK(bev);
904 return r;
907 ev_ssize_t
908 bufferevent_get_max_to_read(struct bufferevent *bev)
910 ev_ssize_t r;
911 BEV_LOCK(bev);
912 r = bufferevent_get_read_max_(BEV_UPCAST(bev));
913 BEV_UNLOCK(bev);
914 return r;
917 ev_ssize_t
918 bufferevent_get_max_to_write(struct bufferevent *bev)
920 ev_ssize_t r;
921 BEV_LOCK(bev);
922 r = bufferevent_get_write_max_(BEV_UPCAST(bev));
923 BEV_UNLOCK(bev);
924 return r;
927 const struct ev_token_bucket_cfg *
928 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
929 struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
930 struct ev_token_bucket_cfg *cfg;
932 BEV_LOCK(bev);
934 if (bufev_private->rate_limiting) {
935 cfg = bufev_private->rate_limiting->cfg;
936 } else {
937 cfg = NULL;
940 BEV_UNLOCK(bev);
942 return cfg;
945 /* Mostly you don't want to use this function from inside libevent;
946 * bufferevent_get_read_max_() is more likely what you want*/
947 ev_ssize_t
948 bufferevent_rate_limit_group_get_read_limit(
949 struct bufferevent_rate_limit_group *grp)
951 ev_ssize_t r;
952 LOCK_GROUP(grp);
953 r = grp->rate_limit.read_limit;
954 UNLOCK_GROUP(grp);
955 return r;
958 /* Mostly you don't want to use this function from inside libevent;
959 * bufferevent_get_write_max_() is more likely what you want. */
960 ev_ssize_t
961 bufferevent_rate_limit_group_get_write_limit(
962 struct bufferevent_rate_limit_group *grp)
964 ev_ssize_t r;
965 LOCK_GROUP(grp);
966 r = grp->rate_limit.write_limit;
967 UNLOCK_GROUP(grp);
968 return r;
972 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
974 int r = 0;
975 ev_ssize_t old_limit, new_limit;
976 struct bufferevent_private *bevp;
977 BEV_LOCK(bev);
978 bevp = BEV_UPCAST(bev);
979 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
980 old_limit = bevp->rate_limiting->limit.read_limit;
982 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
983 if (old_limit > 0 && new_limit <= 0) {
984 bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
985 if (event_add(&bevp->rate_limiting->refill_bucket_event,
986 &bevp->rate_limiting->cfg->tick_timeout) < 0)
987 r = -1;
988 } else if (old_limit <= 0 && new_limit > 0) {
989 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
990 event_del(&bevp->rate_limiting->refill_bucket_event);
991 bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
994 BEV_UNLOCK(bev);
995 return r;
999 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1001 /* XXXX this is mostly copy-and-paste from
1002 * bufferevent_decrement_read_limit */
1003 int r = 0;
1004 ev_ssize_t old_limit, new_limit;
1005 struct bufferevent_private *bevp;
1006 BEV_LOCK(bev);
1007 bevp = BEV_UPCAST(bev);
1008 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1009 old_limit = bevp->rate_limiting->limit.write_limit;
1011 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1012 if (old_limit > 0 && new_limit <= 0) {
1013 bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1014 if (event_add(&bevp->rate_limiting->refill_bucket_event,
1015 &bevp->rate_limiting->cfg->tick_timeout) < 0)
1016 r = -1;
1017 } else if (old_limit <= 0 && new_limit > 0) {
1018 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1019 event_del(&bevp->rate_limiting->refill_bucket_event);
1020 bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1023 BEV_UNLOCK(bev);
1024 return r;
1028 bufferevent_rate_limit_group_decrement_read(
1029 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1031 int r = 0;
1032 ev_ssize_t old_limit, new_limit;
1033 LOCK_GROUP(grp);
1034 old_limit = grp->rate_limit.read_limit;
1035 new_limit = (grp->rate_limit.read_limit -= decr);
1037 if (old_limit > 0 && new_limit <= 0) {
1038 bev_group_suspend_reading_(grp);
1039 } else if (old_limit <= 0 && new_limit > 0) {
1040 bev_group_unsuspend_reading_(grp);
1043 UNLOCK_GROUP(grp);
1044 return r;
1048 bufferevent_rate_limit_group_decrement_write(
1049 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1051 int r = 0;
1052 ev_ssize_t old_limit, new_limit;
1053 LOCK_GROUP(grp);
1054 old_limit = grp->rate_limit.write_limit;
1055 new_limit = (grp->rate_limit.write_limit -= decr);
1057 if (old_limit > 0 && new_limit <= 0) {
1058 bev_group_suspend_writing_(grp);
1059 } else if (old_limit <= 0 && new_limit > 0) {
1060 bev_group_unsuspend_writing_(grp);
1063 UNLOCK_GROUP(grp);
1064 return r;
1067 void
1068 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1069 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1071 EVUTIL_ASSERT(grp != NULL);
1072 if (total_read_out)
1073 *total_read_out = grp->total_read;
1074 if (total_written_out)
1075 *total_written_out = grp->total_written;
1078 void
1079 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1081 grp->total_read = grp->total_written = 0;
1085 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1087 bev->rate_limiting = NULL;
1088 bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1089 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1091 return 0;