Update Red Hat Copyright Notices
[nbdkit.git] / common / allocators / zstd.c
blobfc579ba4fc97eb676c09729cf69355654a7e9bf4
1 /* nbdkit
2 * Copyright Red Hat
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
33 #include <config.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <stdbool.h>
38 #include <stdint.h>
39 #include <inttypes.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <assert.h>
44 #include <pthread.h>
46 #include <nbdkit-plugin.h>
48 #include "cleanup.h"
49 #include "iszero.h"
50 #include "vector.h"
52 #include "allocator.h"
53 #include "allocator-internal.h"
55 #ifdef HAVE_LIBZSTD
57 #include <zstd.h>
59 /* This is derived from the sparse array implementation - see
60 * common/allocators/sparse.c for details of how it works.
62 * TO DO:
64 * (1) We can avoid decompressing a page if we know we are going to
65 * write over / trim / zero the whole page.
67 * (2) Locking is correct but very naive. It should be possible to
68 * take much more fine-grained locks.
70 * (3) Better stats: Can we iterate over the page table in order to
71 * find the ratio of uncompressed : compressed?
73 * Once some optimizations are made it would be worth profiling to
74 * find the hot spots.
76 #define PAGE_SIZE 32768
77 #define L2_SIZE 4096
79 struct l2_entry {
80 void *page; /* Pointer to compressed data. */
83 struct l1_entry {
84 uint64_t offset; /* Virtual offset of this entry. */
85 struct l2_entry *l2_dir; /* Pointer to L2 directory (L2_SIZE entries). */
88 DEFINE_VECTOR_TYPE (l1_dir, struct l1_entry);
90 struct zstd_array {
91 struct allocator a; /* Must come first. */
92 pthread_mutex_t lock;
93 l1_dir l1_dir; /* L1 directory. */
95 /* Compression context and decompression stream. We use the
96 * streaming API for decompression because it allows us to
97 * decompress without storing the compressed size, so we need a
98 * streaming object. But in fact decompression context and stream
99 * are the same thing since zstd 1.3.0.
101 * If we ever get serious about making this allocator work well
102 * multi-threaded [at the moment the locking is too course-grained],
103 * then the zstd documentation recommends creating a context per
104 * thread.
106 ZSTD_CCtx *zcctx;
107 ZSTD_DStream *zdstrm;
109 /* Collect stats when we compress a page. */
110 uint64_t stats_uncompressed_bytes;
111 uint64_t stats_compressed_bytes;
114 /* Free L1 and/or L2 directories. */
115 static void
116 free_l2_dir (struct l2_entry *l2_dir)
118 size_t i;
120 for (i = 0; i < L2_SIZE; ++i)
121 free (l2_dir[i].page);
122 free (l2_dir);
125 static void
126 zstd_array_free (struct allocator *a)
128 struct zstd_array *za = (struct zstd_array *) a;
129 size_t i;
131 if (za) {
132 if (za->stats_compressed_bytes > 0)
133 nbdkit_debug ("zstd: compression ratio: %g : 1",
134 (double) za->stats_uncompressed_bytes /
135 za->stats_compressed_bytes);
137 ZSTD_freeCCtx (za->zcctx);
138 ZSTD_freeDStream (za->zdstrm);
139 for (i = 0; i < za->l1_dir.len; ++i)
140 free_l2_dir (za->l1_dir.ptr[i].l2_dir);
141 free (za->l1_dir.ptr);
142 pthread_mutex_destroy (&za->lock);
143 free (za);
147 static int
148 zstd_array_set_size_hint (struct allocator *a, uint64_t size)
150 /* Ignored. */
151 return 0;
154 /* Comparison function used when searching through the L1 directory. */
155 static int
156 compare_l1_offsets (const void *offsetp, const struct l1_entry *e)
158 const uint64_t offset = *(uint64_t *)offsetp;
160 if (offset < e->offset) return -1;
161 if (offset >= e->offset + PAGE_SIZE*L2_SIZE) return 1;
162 return 0;
165 /* Insert an entry in the L1 directory, keeping it ordered by offset.
166 * This involves an expensive linear scan but should be very rare.
168 static int
169 insert_l1_entry (struct zstd_array *za, const struct l1_entry *entry)
171 size_t i;
173 for (i = 0; i < za->l1_dir.len; ++i) {
174 if (entry->offset < za->l1_dir.ptr[i].offset) {
175 /* Insert new entry before i'th directory entry. */
176 if (l1_dir_insert (&za->l1_dir, *entry, i) == -1) {
177 nbdkit_error ("realloc: %m");
178 return -1;
180 if (za->a.debug)
181 nbdkit_debug ("%s: inserted new L1 entry for %" PRIu64
182 " at l1_dir.ptr[%zu]",
183 __func__, entry->offset, i);
184 return 0;
187 /* This should never happens since each entry in the the L1
188 * directory is supposed to be unique.
190 assert (entry->offset != za->l1_dir.ptr[i].offset);
193 /* Insert new entry at the end. */
194 if (l1_dir_append (&za->l1_dir, *entry) == -1) {
195 nbdkit_error ("realloc: %m");
196 return -1;
198 if (za->a.debug)
199 nbdkit_debug ("%s: inserted new L1 entry for %" PRIu64
200 " at end of l1_dir", __func__, entry->offset);
201 return 0;
204 /* Look up a virtual offset.
206 * If the L2 page is mapped then this uncompresses the page into the
207 * caller's buffer (of size PAGE_SIZE), returning the address of the
208 * offset, the count of bytes to the end of the page, and a pointer to
209 * the L2 directory entry containing the page pointer.
211 * If the L2 page is not mapped this clears the caller's buffer, also
212 * returning the pointer.
214 * To read data you don't need to do anything else.
216 * To write data, after updating the buffer, you must subsequently
217 * call compress() below.
219 * This function cannot return an error.
221 static void *
222 lookup_decompress (struct zstd_array *za, uint64_t offset, void *buf,
223 uint64_t *remaining, struct l2_entry **l2_entry)
225 struct l1_entry *entry;
226 struct l2_entry *l2_dir;
227 uint64_t o;
228 void *page;
230 *remaining = PAGE_SIZE - (offset & (PAGE_SIZE-1));
232 /* Search the L1 directory. */
233 entry = l1_dir_search (&za->l1_dir, &offset, compare_l1_offsets);
235 if (za->a.debug) {
236 if (entry)
237 nbdkit_debug ("%s: search L1 dir: entry found: offset %" PRIu64,
238 __func__, entry->offset);
239 else
240 nbdkit_debug ("%s: search L1 dir: no entry found", __func__);
243 if (entry) {
244 l2_dir = entry->l2_dir;
246 /* Which page in the L2 directory? */
247 o = (offset - entry->offset) / PAGE_SIZE;
248 if (l2_entry)
249 *l2_entry = &l2_dir[o];
250 page = l2_dir[o].page;
252 if (page) {
253 /* Decompress the page into the user buffer. We assume this can
254 * never fail since the only pages we decompress are ones we
255 * have compressed. We use the streaming API because the normal
256 * ZSTD_decompressDCtx function requires the compressed size,
257 * whereas the streaming API does not.
259 ZSTD_inBuffer inb = { .src = page, .size = SIZE_MAX, .pos = 0 };
260 ZSTD_outBuffer outb = { .dst = buf, .size = PAGE_SIZE, .pos = 0 };
262 ZSTD_initDStream (za->zdstrm);
263 while (outb.pos < outb.size)
264 ZSTD_decompressStream (za->zdstrm, &outb, &inb);
265 assert (outb.pos == PAGE_SIZE);
267 else
268 memset (buf, 0, PAGE_SIZE);
270 return buf + (offset & (PAGE_SIZE-1));
273 /* No L1 directory entry found. */
274 memset (buf, 0, PAGE_SIZE);
275 return buf + (offset & (PAGE_SIZE-1));
278 /* Compress a page back after modifying it.
280 * This replaces a L2 page with a new version compressed from the
281 * modified user buffer.
283 * It may fail, calling nbdkit_error and returning -1.
285 static int
286 compress (struct zstd_array *za, uint64_t offset, void *buf)
288 struct l1_entry *entry;
289 struct l2_entry *l2_dir;
290 uint64_t o;
291 void *page;
292 struct l1_entry new_entry;
293 size_t n;
295 again:
296 /* Search the L1 directory. */
297 entry = l1_dir_search (&za->l1_dir, &offset, compare_l1_offsets);
299 if (za->a.debug) {
300 if (entry)
301 nbdkit_debug ("%s: search L1 dir: entry found: offset %" PRIu64,
302 __func__, entry->offset);
303 else
304 nbdkit_debug ("%s: search L1 dir: no entry found", __func__);
307 if (entry) {
308 l2_dir = entry->l2_dir;
310 /* Which page in the L2 directory? */
311 o = (offset - entry->offset) / PAGE_SIZE;
312 free (l2_dir[o].page);
313 l2_dir[o].page = NULL;
315 /* Allocate a new page. */
316 n = ZSTD_compressBound (PAGE_SIZE);
317 page = malloc (n);
318 if (page == NULL) {
319 nbdkit_error ("malloc: %m");
320 return -1;
322 n = ZSTD_compressCCtx (za->zcctx, page, n,
323 buf, PAGE_SIZE, ZSTD_CLEVEL_DEFAULT);
324 if (ZSTD_isError (n)) {
325 nbdkit_error ("ZSTD_compressCCtx: %s", ZSTD_getErrorName (n));
326 return -1;
328 page = realloc (page, n);
329 assert (page != NULL);
330 l2_dir[o].page = page;
331 za->stats_uncompressed_bytes += PAGE_SIZE;
332 za->stats_compressed_bytes += n;
333 return 0;
336 /* No L1 directory entry, so we need to allocate a new L1 directory
337 * entry and insert it in the L1 directory, and allocate the L2
338 * directory with NULL page pointers. Then we can repeat the above
339 * search to create the page.
341 new_entry.offset = offset & ~(PAGE_SIZE*L2_SIZE-1);
342 new_entry.l2_dir = calloc (L2_SIZE, sizeof (struct l2_entry));
343 if (new_entry.l2_dir == NULL) {
344 nbdkit_error ("calloc: %m");
345 return -1;
347 if (insert_l1_entry (za, &new_entry) == -1) {
348 free (new_entry.l2_dir);
349 return -1;
351 goto again;
354 static int
355 zstd_array_read (struct allocator *a,
356 void *buf, uint64_t count, uint64_t offset)
358 struct zstd_array *za = (struct zstd_array *) a;
359 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&za->lock);
360 CLEANUP_FREE void *tbuf = NULL;
361 uint64_t n;
362 void *p;
364 tbuf = malloc (PAGE_SIZE);
365 if (tbuf == NULL) {
366 nbdkit_error ("malloc: %m");
367 return -1;
370 while (count > 0) {
371 p = lookup_decompress (za, offset, tbuf, &n, NULL);
372 if (n > count)
373 n = count;
375 memcpy (buf, p, n);
377 buf += n;
378 count -= n;
379 offset += n;
382 return 0;
385 static int
386 zstd_array_write (struct allocator *a,
387 const void *buf, uint64_t count, uint64_t offset)
389 struct zstd_array *za = (struct zstd_array *) a;
390 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&za->lock);
391 CLEANUP_FREE void *tbuf = NULL;
392 uint64_t n;
393 void *p;
395 tbuf = malloc (PAGE_SIZE);
396 if (tbuf == NULL) {
397 nbdkit_error ("malloc: %m");
398 return -1;
401 while (count > 0) {
402 p = lookup_decompress (za, offset, tbuf, &n, NULL);
404 if (n > count)
405 n = count;
406 memcpy (p, buf, n);
408 if (compress (za, offset, tbuf) == -1)
409 return -1;
411 buf += n;
412 count -= n;
413 offset += n;
416 return 0;
419 static int zstd_array_zero (struct allocator *a,
420 uint64_t count, uint64_t offset);
422 static int
423 zstd_array_fill (struct allocator *a, char c,
424 uint64_t count, uint64_t offset)
426 struct zstd_array *za = (struct zstd_array *) a;
427 CLEANUP_FREE void *tbuf = NULL;
428 uint64_t n;
429 void *p;
431 if (c == 0) {
432 zstd_array_zero (a, count, offset);
433 return 0;
436 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&za->lock);
438 tbuf = malloc (PAGE_SIZE);
439 if (tbuf == NULL) {
440 nbdkit_error ("malloc: %m");
441 return -1;
444 while (count > 0) {
445 p = lookup_decompress (za, offset, tbuf, &n, NULL);
447 if (n > count)
448 n = count;
449 memset (p, c, n);
451 if (compress (za, offset, tbuf) == -1)
452 return -1;
454 count -= n;
455 offset += n;
458 return 0;
461 static int
462 zstd_array_zero (struct allocator *a, uint64_t count, uint64_t offset)
464 struct zstd_array *za = (struct zstd_array *) a;
465 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&za->lock);
466 CLEANUP_FREE void *tbuf = NULL;
467 uint64_t n;
468 void *p;
469 struct l2_entry *l2_entry = NULL;
471 tbuf = malloc (PAGE_SIZE);
472 if (tbuf == NULL) {
473 nbdkit_error ("malloc: %m");
474 return -1;
477 while (count > 0) {
478 p = lookup_decompress (za, offset, tbuf, &n, &l2_entry);
480 if (n > count)
481 n = count;
482 memset (p, 0, n);
484 if (l2_entry && l2_entry->page) {
485 /* If the whole page is now zero, free it. */
486 if (n >= PAGE_SIZE || is_zero (l2_entry->page, PAGE_SIZE)) {
487 if (za->a.debug)
488 nbdkit_debug ("%s: freeing zero page at offset %" PRIu64,
489 __func__, offset);
490 free (l2_entry->page);
491 l2_entry->page = NULL;
493 else {
494 if (compress (za, offset, tbuf) == -1)
495 return -1;
499 count -= n;
500 offset += n;
503 return 0;
506 static int
507 zstd_array_blit (struct allocator *a1,
508 struct allocator *a2,
509 uint64_t count,
510 uint64_t offset1, uint64_t offset2)
512 struct zstd_array *za2 = (struct zstd_array *) a2;
513 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&za2->lock);
514 CLEANUP_FREE void *tbuf = NULL;
515 uint64_t n;
516 void *p;
518 assert (a1 != a2);
519 assert (strcmp (a2->f->type, "zstd") == 0);
521 tbuf = malloc (PAGE_SIZE);
522 if (tbuf == NULL) {
523 nbdkit_error ("malloc: %m");
524 return -1;
527 while (count > 0) {
528 p = lookup_decompress (za2, offset2, tbuf, &n, NULL);
530 if (n > count)
531 n = count;
533 /* Read the source allocator (a1) directly to p which points into
534 * the right place in za2.
536 if (a1->f->read (a1, p, n, offset1) == -1)
537 return -1;
539 if (compress (za2, offset2, tbuf) == -1)
540 return -1;
542 count -= n;
543 offset1 += n;
544 offset2 += n;
547 return 0;
550 static int
551 zstd_array_extents (struct allocator *a,
552 uint64_t count, uint64_t offset,
553 struct nbdkit_extents *extents)
555 struct zstd_array *za = (struct zstd_array *) a;
556 ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&za->lock);
557 CLEANUP_FREE void *buf = NULL;
558 uint64_t n;
559 uint32_t type;
560 void *p;
561 struct l2_entry *l2_entry;
563 buf = malloc (PAGE_SIZE);
564 if (buf == NULL) {
565 nbdkit_error ("malloc: %m");
566 return -1;
569 while (count > 0) {
570 p = lookup_decompress (za, offset, buf, &n, &l2_entry);
572 /* Work out the type of this extent. */
573 if (l2_entry->page == NULL)
574 /* No backing page, so it's a hole. */
575 type = NBDKIT_EXTENT_HOLE | NBDKIT_EXTENT_ZERO;
576 else {
577 if (is_zero (p, n))
578 /* A backing page and it's all zero, it's a zero extent. */
579 type = NBDKIT_EXTENT_ZERO;
580 else
581 /* Normal allocated data. */
582 type = 0;
584 if (nbdkit_add_extent (extents, offset, n, type) == -1)
585 return -1;
587 if (n > count)
588 n = count;
590 count -= n;
591 offset += n;
594 return 0;
597 struct allocator *
598 zstd_array_create (const void *paramsv)
600 const allocator_parameters *params = paramsv;
601 struct zstd_array *za;
603 if (params->len > 0) {
604 nbdkit_error ("allocator=zstd does not take extra parameters");
605 return NULL;
608 za = calloc (1, sizeof *za);
609 if (za == NULL) {
610 nbdkit_error ("calloc: %m");
611 return NULL;
614 pthread_mutex_init (&za->lock, NULL);
616 za->zcctx = ZSTD_createCCtx ();
617 if (za->zcctx == NULL) {
618 nbdkit_error ("ZSTD_createCCtx: %m");
619 free (za);
620 return NULL;
622 za->zdstrm = ZSTD_createDStream ();
623 if (za->zdstrm == NULL) {
624 nbdkit_error ("ZSTD_createDStream: %m");
625 ZSTD_freeCCtx (za->zcctx);
626 free (za);
627 return NULL;
630 za->stats_uncompressed_bytes = za->stats_compressed_bytes = 0;
632 return (struct allocator *) za;
635 static struct allocator_functions functions = {
636 .type = "zstd",
637 .create = zstd_array_create,
638 .free = zstd_array_free,
639 .set_size_hint = zstd_array_set_size_hint,
640 .read = zstd_array_read,
641 .write = zstd_array_write,
642 .fill = zstd_array_fill,
643 .zero = zstd_array_zero,
644 .blit = zstd_array_blit,
645 .extents = zstd_array_extents,
648 static void register_zstd_array (void) __attribute__ ((constructor));
650 static void
651 register_zstd_array (void)
653 register_allocator (&functions);
656 #endif /* !HAVE_LIBZSTD */