2 * Copyright (C) 2014 Red Hat Inc.
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
40 #include <sys/types.h>
44 #include <nbdkit-plugin.h>
46 static char *filename
= NULL
;
49 /* In theory INT64_MAX, but it breaks qemu's NBD driver. */
50 static int64_t size
= INT64_MAX
/2;
52 /* Flag if we have entered the unrecoverable error state because of
55 static int errorstate
= 0;
57 /* Highest byte (+1) that has been written in the data stream. */
58 static uint64_t highestwrite
= 0;
60 /* Called for each key=value passed on the command line. */
62 streaming_config (const char *key
, const char *value
)
64 if (strcmp (key
, "pipe") == 0) {
65 /* See FILENAMES AND PATHS in nbdkit-plugin(3). */
66 filename
= nbdkit_absolute_path (value
);
70 else if (strcmp (key
, "size") == 0) {
71 size
= nbdkit_parse_size (value
);
76 nbdkit_error ("unknown parameter '%s'", key
);
83 /* Check the user did pass a pipe=<FILENAME> parameter. */
85 streaming_config_complete (void)
87 if (filename
== NULL
) {
88 nbdkit_error ("you must supply the pipe=<FILENAME> parameter "
89 "after the plugin name on the command line");
93 /* Open the file blindly. If this fails with ENOENT then we create a
97 fd
= open (filename
, O_RDWR
|O_CLOEXEC
|O_NOCTTY
);
99 if (errno
!= ENOENT
) {
100 nbdkit_error ("open: %s: %m", filename
);
103 if (mknod (filename
, S_IFIFO
| 0666, 0) == -1) {
104 nbdkit_error ("mknod: %s: %m", filename
);
113 /* nbdkit is shutting down. */
115 streaming_unload (void)
122 #define streaming_config_help \
123 "pipe=<FILENAME> (required) The filename to serve.\n" \
124 "size=<SIZE> (optional) Stream size."
126 /* Create the per-connection handle. */
128 streaming_open (int readonly
)
133 nbdkit_error ("you cannot use the -r option with the streaming plugin");
138 nbdkit_error ("unrecoverable error state, "
139 "no new connections can be opened");
143 /* There is no handle, so return an arbitrary non-NULL pointer. */
149 /* Free up the per-connection handle. */
151 streaming_close (void *handle
)
155 #define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS
157 /* Return the size of the stream (infinite). */
159 streaming_get_size (void *handle
)
164 /* Write data to the stream. */
166 streaming_pwrite (void *handle
, const void *buf
,
167 uint32_t count
, uint64_t offset
)
173 nbdkit_error ("unrecoverable error state");
178 if (offset
< highestwrite
) {
179 nbdkit_error ("client tried to seek backwards and write: "
180 "the streaming plugin does not currently support this");
186 /* Need to write some zeroes. */
187 if (offset
> highestwrite
) {
188 int64_t remaining
= offset
- highestwrite
;
189 static char zerobuf
[4096];
191 while (remaining
> 0) {
192 n
= remaining
> sizeof zerobuf
? sizeof zerobuf
: remaining
;
193 r
= write (fd
, zerobuf
, n
);
195 nbdkit_error ("write: %m");
204 /* Write the data. */
206 r
= write (fd
, buf
, count
);
208 nbdkit_error ("write: %m");
220 /* Read data back from the stream. */
222 streaming_pread (void *handle
, void *buf
, uint32_t count
, uint64_t offset
)
225 nbdkit_error ("unrecoverable error state");
230 /* Allow reads which are entirely >= highestwrite. These return zeroes. */
231 if (offset
>= highestwrite
) {
232 memset (buf
, 0, count
);
236 nbdkit_error ("client tried to read: "
237 "the streaming plugin does not currently support this");
243 static struct nbdkit_plugin plugin
= {
245 .longname
= "nbdkit streaming plugin",
246 .version
= PACKAGE_VERSION
,
247 .unload
= streaming_unload
,
248 .config
= streaming_config
,
249 .config_complete
= streaming_config_complete
,
250 .config_help
= streaming_config_help
,
251 .open
= streaming_open
,
252 .close
= streaming_close
,
253 .get_size
= streaming_get_size
,
254 .pwrite
= streaming_pwrite
,
255 .pread
= streaming_pread
,
256 .errno_is_preserved
= 1,
259 NBDKIT_REGISTER_PLUGIN(plugin
)