Update Red Hat Copyright Notices
[nbdkit.git] / filters / rate / bucket.c
blob5c228fef69445d5d9411d1a87767d31cc68c5ac0
1 /* nbdkit
2 * Copyright Red Hat
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
33 /* This filter is implemented using a Token Bucket
34 * (https://en.wikipedia.org/wiki/Token_bucket). There are two
35 * buckets per connection (one each for reading and writing) and two
36 * global buckets (also for reading and writing).
38 * │ │ ← bucket->capacity
39 * │ │
40 * │░░░░░░░│ ← bucket->level
41 * │░░░░░░░│
42 * │░░░░░░░│
43 * └───────┘
45 * We add tokens at the desired rate (the per-connection rate for the
46 * connection buckets, and the global rate for the global buckets).
47 * Note that we don't actually keep the buckets updated in real time
48 * because as a filter we are called asynchronously. Instead for each
49 * bucket we store the last time we were called and add the
50 * appropriate number of tokens when we are called next.
52 * The bucket capacity controls the burstiness allowed. This is
53 * hard-coded at the moment but could be configurable. All buckets
54 * start off full.
56 * When a packet is to be read or written, if there are sufficient
57 * tokens in the bucket then the packet may be immediately passed
58 * through to the underlying plugin. The number of bits used is
59 * deducted from the appropriate per-connection and global bucket.
61 * If there are insufficient tokens then the packet must be delayed.
62 * This is done by inserting a sleep which has an estimated length
63 * that is long enough based on the rate at which enough tokens will
64 * replenish the bucket to allow the packet to be sent next time.
67 #include <config.h>
69 #include <stdio.h>
70 #include <stdlib.h>
71 #include <stdint.h>
72 #include <inttypes.h>
73 #include <string.h>
74 #include <time.h>
75 #include <sys/time.h>
77 #include <nbdkit-filter.h>
79 #include "minmax.h"
80 #include "tvdiff.h"
82 #include "bucket.h"
84 NBDKIT_DLL_PUBLIC int rate_debug_bucket; /* -D rate.bucket=1 */
86 void
87 bucket_init (struct bucket *bucket, uint64_t rate, double capacity_secs)
89 bucket->rate = rate;
91 /* Store the capacity passed to this function. We will need this if
92 * we adjust the rate dynamically.
94 bucket->capacity_secs = capacity_secs;
96 /* Capacity is expressed in seconds, but we want to know the
97 * capacity in tokens, so multiply by the rate to get this.
99 bucket->capacity = rate * capacity_secs;
101 /* Buckets start off full. */
102 bucket->level = bucket->capacity;
104 gettimeofday (&bucket->tv, NULL);
107 uint64_t
108 bucket_adjust_rate (struct bucket *bucket, uint64_t rate)
110 uint64_t old_rate = bucket->rate;
112 bucket->rate = rate;
113 bucket->capacity = rate * bucket->capacity_secs;
114 if (bucket->level > bucket->capacity)
115 bucket->level = bucket->capacity;
116 return old_rate;
119 uint64_t
120 bucket_run (struct bucket *bucket, const char *bucket_name,
121 uint64_t n, struct timespec *ts)
123 struct timeval now;
124 int64_t usec;
125 uint64_t add, nsec;
127 /* rate == 0 is a special case meaning that there is no limit being
128 * enforced.
130 if (bucket->rate == 0)
131 return 0;
133 gettimeofday (&now, NULL);
135 /* Work out how much time has elapsed since we last added tokens to
136 * the bucket, and add the correct number of tokens.
138 usec = tvdiff_usec (&bucket->tv, &now);
139 if (usec < 0) /* Maybe happens if system time not monotonic? */
140 usec = 0;
142 add = bucket->rate * usec / 1000000;
143 add = MIN (add, bucket->capacity - bucket->level);
144 if (rate_debug_bucket)
145 nbdkit_debug ("bucket %s: adding %" PRIu64 " tokens, new level %" PRIu64,
146 bucket_name, add, bucket->level + add);
147 bucket->level += add;
148 bucket->tv = now;
150 /* Can we deduct N tokens from the bucket? If yes then we're good,
151 * and we can return 0 which means the caller won't sleep.
153 if (bucket->level >= n) {
154 if (rate_debug_bucket)
155 nbdkit_debug ("bucket %s: deducting %" PRIu64 " tokens", bucket_name, n);
156 bucket->level -= n;
157 return 0;
160 if (rate_debug_bucket)
161 nbdkit_debug ("bucket %s: deducting %" PRIu64 " tokens, bucket empty, "
162 "need another %" PRIu64 " tokens",
163 bucket_name, bucket->level, n - bucket->level);
165 n -= bucket->level;
166 bucket->level = 0;
168 /* Now we need to estimate how long it will take to add N tokens to
169 * the bucket, which is how long the caller must sleep for.
171 nsec = 1000000000 * n / bucket->rate;
172 ts->tv_sec = nsec / 1000000000;
173 ts->tv_nsec = nsec % 1000000000;
175 if (rate_debug_bucket)
176 nbdkit_debug ("bucket %p: sleeping for %.1f seconds", bucket,
177 nsec / 1000000000.);
179 return n;