4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 /* This filter is implemented using a Token Bucket
34 * (https://en.wikipedia.org/wiki/Token_bucket). There are two
35 * buckets per connection (one each for reading and writing) and two
36 * global buckets (also for reading and writing).
38 * │ │ ← bucket->capacity
40 * │░░░░░░░│ ← bucket->level
45 * We add tokens at the desired rate (the per-connection rate for the
46 * connection buckets, and the global rate for the global buckets).
47 * Note that we don't actually keep the buckets updated in real time
48 * because as a filter we are called asynchronously. Instead for each
49 * bucket we store the last time we were called and add the
50 * appropriate number of tokens when we are called next.
52 * The bucket capacity controls the burstiness allowed. This is
53 * hard-coded at the moment but could be configurable. All buckets
56 * When a packet is to be read or written, if there are sufficient
57 * tokens in the bucket then the packet may be immediately passed
58 * through to the underlying plugin. The number of bits used is
59 * deducted from the appropriate per-connection and global bucket.
61 * If there are insufficient tokens then the packet must be delayed.
62 * This is done by inserting a sleep which has an estimated length
63 * that is long enough based on the rate at which enough tokens will
64 * replenish the bucket to allow the packet to be sent next time.
77 #include <nbdkit-filter.h>
84 NBDKIT_DLL_PUBLIC
int rate_debug_bucket
; /* -D rate.bucket=1 */
87 bucket_init (struct bucket
*bucket
, uint64_t rate
, double capacity_secs
)
91 /* Store the capacity passed to this function. We will need this if
92 * we adjust the rate dynamically.
94 bucket
->capacity_secs
= capacity_secs
;
96 /* Capacity is expressed in seconds, but we want to know the
97 * capacity in tokens, so multiply by the rate to get this.
99 bucket
->capacity
= rate
* capacity_secs
;
101 /* Buckets start off full. */
102 bucket
->level
= bucket
->capacity
;
104 gettimeofday (&bucket
->tv
, NULL
);
108 bucket_adjust_rate (struct bucket
*bucket
, uint64_t rate
)
110 uint64_t old_rate
= bucket
->rate
;
113 bucket
->capacity
= rate
* bucket
->capacity_secs
;
114 if (bucket
->level
> bucket
->capacity
)
115 bucket
->level
= bucket
->capacity
;
120 bucket_run (struct bucket
*bucket
, const char *bucket_name
,
121 uint64_t n
, struct timespec
*ts
)
127 /* rate == 0 is a special case meaning that there is no limit being
130 if (bucket
->rate
== 0)
133 gettimeofday (&now
, NULL
);
135 /* Work out how much time has elapsed since we last added tokens to
136 * the bucket, and add the correct number of tokens.
138 usec
= tvdiff_usec (&bucket
->tv
, &now
);
139 if (usec
< 0) /* Maybe happens if system time not monotonic? */
142 add
= bucket
->rate
* usec
/ 1000000;
143 add
= MIN (add
, bucket
->capacity
- bucket
->level
);
144 if (rate_debug_bucket
)
145 nbdkit_debug ("bucket %s: adding %" PRIu64
" tokens, new level %" PRIu64
,
146 bucket_name
, add
, bucket
->level
+ add
);
147 bucket
->level
+= add
;
150 /* Can we deduct N tokens from the bucket? If yes then we're good,
151 * and we can return 0 which means the caller won't sleep.
153 if (bucket
->level
>= n
) {
154 if (rate_debug_bucket
)
155 nbdkit_debug ("bucket %s: deducting %" PRIu64
" tokens", bucket_name
, n
);
160 if (rate_debug_bucket
)
161 nbdkit_debug ("bucket %s: deducting %" PRIu64
" tokens, bucket empty, "
162 "need another %" PRIu64
" tokens",
163 bucket_name
, bucket
->level
, n
- bucket
->level
);
168 /* Now we need to estimate how long it will take to add N tokens to
169 * the bucket, which is how long the caller must sleep for.
171 nsec
= 1000000000 * n
/ bucket
->rate
;
172 ts
->tv_sec
= nsec
/ 1000000000;
173 ts
->tv_nsec
= nsec
% 1000000000;
175 if (rate_debug_bucket
)
176 nbdkit_debug ("bucket %p: sleeping for %.1f seconds", bucket
,