2 Simple queuing of input and output records for libctdb
4 Copyright (C) Rusty Russell 2010
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
19 #include <sys/types.h>
20 #include <sys/socket.h>
27 #include "libctdb_private.h"
30 #include <netinet/in.h>
31 #include <dlinklist.h>
32 #include <ctdb_protocol.h> // For CTDB_DS_ALIGNMENT and ctdb_req_header
35 struct io_elem
*next
, *prev
;
40 struct io_elem
*new_io_elem(size_t len
)
45 len
= (len
+ (CTDB_DS_ALIGNMENT
-1)) & ~(CTDB_DS_ALIGNMENT
-1);
47 elem
= malloc(sizeof(*elem
));
50 elem
->data
= malloc(len
);
56 /* stamp out any padding to keep valgrind happy */
58 memset(elem
->data
+ ask
, 0, len
-ask
);
67 void free_io_elem(struct io_elem
*io
)
73 bool io_elem_finished(const struct io_elem
*io
)
75 return io
->off
== io
->len
;
78 void io_elem_init_req_header(struct io_elem
*io
,
83 struct ctdb_req_header
*hdr
= io_elem_data(io
, NULL
);
85 hdr
->length
= io
->len
;
86 hdr
->ctdb_magic
= CTDB_MAGIC
;
87 hdr
->ctdb_version
= CTDB_VERSION
;
88 /* Generation and srcnode only used for inter-ctdbd communication. */
90 hdr
->destnode
= destnode
;
92 hdr
->operation
= operation
;
96 /* Access to raw data: if len is non-NULL it is filled in. */
97 void *io_elem_data(const struct io_elem
*io
, size_t *len
)
104 /* Returns -1 if we hit an error. Errno will be set. */
105 int read_io_elem(int fd
, struct io_elem
*io
)
109 ret
= read(fd
, io
->data
+ io
->off
, io
->len
- io
->off
);
114 if (io_elem_finished(io
)) {
115 struct ctdb_req_header
*hdr
= (void *)io
->data
;
117 /* Finished. But maybe this was just header? */
118 if (io
->len
== sizeof(*hdr
) && hdr
->length
> io
->len
) {
121 /* Enlarge and re-read. */
122 io
->len
= hdr
->length
;
123 newdata
= realloc(io
->data
, io
->len
);
127 /* Try reading again immediately. */
128 reret
= read_io_elem(fd
, io
);
137 /* Returns -1 if we hit an error. Errno will be set. */
138 int write_io_elem(int fd
, struct io_elem
*io
)
142 ret
= write(fd
, io
->data
+ io
->off
, io
->len
- io
->off
);
150 void io_elem_reset(struct io_elem
*io
)
155 void io_elem_queue(struct ctdb_connection
*ctdb
, struct io_elem
*io
)
157 DLIST_ADD_END(ctdb
->inqueue
, io
, struct io_elem
);
160 void io_elem_dequeue(struct ctdb_connection
*ctdb
, struct io_elem
*io
)
162 DLIST_REMOVE(ctdb
->inqueue
, io
);