tests: Fix test for IPv6 support.
[nbdkit/ericb.git] / filters / rate / bucket.c
blob4913abc4661daede16b78771be4a6da2b96f94c3
1 /* nbdkit
2 * Copyright (C) 2018-2019 Red Hat Inc.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
33 /* This filter is implemented using a Token Bucket
34 * (https://en.wikipedia.org/wiki/Token_bucket). There are two
35 * buckets per connection (one each for reading and writing) and two
36 * global buckets (also for reading and writing).
38 * We add tokens at the desired rate (the per-connection rate for the
39 * connection buckets, and the global rate for the global buckets).
40 * Note that we don't actually keep the buckets updated in real time
41 * because as a filter we are called asynchronously. Instead for each
42 * bucket we store the last time we were called and add the
43 * appropriate number of tokens when we are called next.
45 * The bucket capacity controls the burstiness allowed. This is
46 * hard-coded at the moment but could be configurable. All buckets
47 * start off full.
49 * When a packet is to be read or written, if there are sufficient
50 * tokens in the bucket then the packet may be immediately passed
51 * through to the underlying plugin. The number of bits used is
52 * deducted from the appropriate per-connection and global bucket.
54 * If there are insufficient tokens then the packet must be delayed.
55 * This is done by inserting a sleep which has an estimated length
56 * that is long enough based on the rate at which enough tokens will
57 * replenish the bucket to allow the packet to be sent next time.
60 #include <config.h>
62 #include <stdio.h>
63 #include <stdlib.h>
64 #include <stdint.h>
65 #include <inttypes.h>
66 #include <string.h>
67 #include <time.h>
68 #include <sys/time.h>
70 #include <nbdkit-filter.h>
72 #include "minmax.h"
73 #include "tvdiff.h"
75 #include "bucket.h"
77 int rate_debug_bucket; /* -D rate.bucket=1 */
79 void
80 bucket_init (struct bucket *bucket, uint64_t rate, double capacity_secs)
82 bucket->rate = rate;
84 /* Store the capacity passed to this function. We will need this if
85 * we adjust the rate dynamically.
87 bucket->capacity_secs = capacity_secs;
89 /* Capacity is expressed in seconds, but we want to know the
90 * capacity in tokens, so multiply by the rate to get this.
92 bucket->capacity = rate * capacity_secs;
94 /* Buckets start off full. */
95 bucket->level = bucket->capacity;
97 gettimeofday (&bucket->tv, NULL);
100 uint64_t
101 bucket_adjust_rate (struct bucket *bucket, uint64_t rate)
103 uint64_t old_rate = bucket->rate;
105 bucket->rate = rate;
106 bucket->capacity = rate * bucket->capacity_secs;
107 if (bucket->level > bucket->capacity)
108 bucket->level = bucket->capacity;
109 return old_rate;
112 uint64_t
113 bucket_run (struct bucket *bucket, uint64_t n, struct timespec *ts)
115 struct timeval now;
116 int64_t usec;
117 uint64_t add, nsec;
119 /* rate == 0 is a special case meaning that there is no limit being
120 * enforced.
122 if (bucket->rate == 0)
123 return 0;
125 gettimeofday (&now, NULL);
127 /* Work out how much time has elapsed since we last added tokens to
128 * the bucket, and add the correct number of tokens.
130 usec = tvdiff_usec (&bucket->tv, &now);
131 if (usec < 0) /* Maybe happens if system time not monotonic? */
132 usec = 0;
134 add = bucket->rate * usec / 1000000;
135 add = MIN (add, bucket->capacity - bucket->level);
136 if (rate_debug_bucket)
137 nbdkit_debug ("bucket %p: adding %" PRIu64 " tokens, new level %" PRIu64,
138 bucket, add, bucket->level + add);
139 bucket->level += add;
140 bucket->tv = now;
142 /* Can we deduct N tokens from the bucket? If yes then we're good,
143 * and we can return 0 which means the caller won't sleep.
145 if (bucket->level >= n) {
146 if (rate_debug_bucket)
147 nbdkit_debug ("bucket %p: deducting %" PRIu64 " tokens", bucket, n);
148 bucket->level -= n;
149 return 0;
152 if (rate_debug_bucket)
153 nbdkit_debug ("bucket %p: deducting %" PRIu64 " tokens, bucket empty, "
154 "need another %" PRIu64 " tokens",
155 bucket, bucket->level, n - bucket->level);
157 n -= bucket->level;
158 bucket->level = 0;
160 /* Now we need to estimate how long it will take to add N tokens to
161 * the bucket, which is how long the caller must sleep for.
163 nsec = 1000000000 * n / bucket->rate;
164 ts->tv_sec = nsec / 1000000000;
165 ts->tv_nsec = nsec % 1000000000;
167 if (rate_debug_bucket)
168 nbdkit_debug ("bucket %p: sleeping for %.1f seconds", bucket,
169 nsec / 1000000000.);
171 return n;