Non-blocking ops now raise error-again when op can't be performed
[cl-zmq.git] / zmq.lisp
blobaf5b91fb81aa239fe781a38d1a7f148220fbe4f2
1 (in-package :cl-zmq)
3 (defcvar "errno" :int)
5 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
6 ;; 0MQ errors.
7 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
9 (defconstant hausnumero 156384712
10 "A number random anough not to collide with different errno ranges on
11 different OSes. The assumption is that error_t is at least 32-bit type.")
13 ;; On Windows platform some of the standard POSIX errnos are not defined.
14 ;; #ifndef ENOTSUP
15 ;; #define ENOTSUP (ZMQ_HAUSNUMERO + 1)
16 ;; #endif
17 ;; #ifndef EPROTONOSUPPORT
18 ;; #define EPROTONOSUPPORT (ZMQ_HAUSNUMERO + 2)
19 ;; #endif
20 ;; #ifndef ENOBUFS
21 ;; #define ENOBUFS (ZMQ_HAUSNUMERO + 3)
22 ;; #endif
23 ;; #ifndef ENETDOWN
24 ;; #define ENETDOWN (ZMQ_HAUSNUMERO + 4)
25 ;; #endif
26 ;; #ifndef EADDRINUSE
27 ;; #define EADDRINUSE (ZMQ_HAUSNUMERO + 5)
28 ;; #endif
29 ;; #ifndef EADDRNOTAVAIL
30 ;; #define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6)
31 ;; #endif
33 ;; Native 0MQ error codes.
34 (defconstant emthread (+ hausnumero 50))
35 (defconstant efsm (+ hausnumero 51))
36 (defconstant enocompatproto (+ hausnumero 52))
38 (defcfun ("zmq_strerror" %strerror) :pointer
39 "Resolves system errors and 0MQ errors to human-readable string."
40 (errnum :int))
42 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
43 ;; 0MQ message definition.
44 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
46 (defconstant max-vsm-size 30
47 "Maximal size of \"Very Small Message\". VSMs are passed by value
48 to avoid excessive memory allocation/deallocation.
49 If VMSs larger than 255 bytes are required, type of 'vsm_size'
50 field in zmq_msg_t structure should be modified accordingly.")
52 ;; Message types. These integers may be stored in 'content' member of the
53 ;; message instead of regular pointer to the data.
54 (defconstant delimiter 31)
55 (defconstant vsm 32)
57 (def-c-struct (msg)
58 "A message. if 'shared' is true, message content pointed to by 'content'
59 is shared, i.e. reference counting is used to manage its lifetime
60 rather than straighforward malloc/free. struct zmq_msg_content is
61 not declared in the api."
62 (content :pointer)
63 (shared :uchar)
64 (vsm-size :uchar)
65 (vsm-data :uchar :count 30)) ;; FIXME max-vsm-size
67 (defcfun ("zmq_msg_init" msg-init) :int
68 "Initialise an empty message (zero bytes long)."
69 (msg msg))
71 (defcfun* ("zmq_msg_init_size" msg-init-size) :int
72 "Initialise a message 'size' bytes long.
74 Errors: ENOMEM - the size is too large to allocate."
75 (msg msg)
76 (size :long))
78 (defcallback zmq-free :void ((ptr :pointer))
79 (format t "zmq-free ~A~%" ptr)
80 (foreign-free ptr))
82 ;;typedef void (zmq_free_fn) (void *data);
83 (defcfun ("zmq_msg_init_data" msg-init-data) :int
84 "Initialise a message from an existing buffer. Message isn't copied,
85 instead 0MQ infrastructure takes ownership of the buffer and
86 deallocation function (ffn) will be called once the data are not
87 needed anymore. Note that deallocation function prototype is designed
88 so that it complies with standard C 'free' function."
89 (msg msg)
90 (data :pointer)
91 (size :long)
92 (ffn :pointer)) ; zmq_free_fn
94 (defcfun ("zmq_msg_close" msg-close) :int
95 "Deallocate the message."
96 (msg msg))
98 (defcfun ("zmq_msg_move" %msg-move) :int
99 "Move the content of the message from 'src' to 'dest'. The content isn't
100 copied, just moved. 'src' is an empty message after the call. Original
101 content of 'dest' message is deallocated."
102 (dest msg)
103 (src msg))
105 (defcfun ("zmq_msg_copy" %msg-copy) :int
106 "Copy the 'src' message to 'dest'. The content isn't copied, instead
107 reference count is increased. Don't modify the message data after the
108 call as they are shared between two messages. Original content of 'dest'
109 message is deallocated."
110 (dest msg)
111 (src msg))
113 (defcfun ("zmq_msg_data" %msg-data) :pointer
114 "Returns pointer to message data."
115 (msg msg))
117 (defcfun ("zmq_msg_size" %msg-size) :int
118 "Return size of message data (in bytes)."
119 (msg msg))
121 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
122 ;; 0MQ infrastructure (a.k.a. context) initialisation & termination.
123 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
125 (defconstant poll 1
126 "Flag specifying that the sockets within this context should be pollable.
127 This may be a little less efficient that raw non-pollable sockets.")
129 (defcfun* ("zmq_init" init) :pointer
130 "Initialise 0MQ context. 'app_threads' specifies maximal number
131 of application threads that can own open sockets at the same time.
132 'io_threads' specifies the size of thread pool to handle I/O operations.
133 'flags' argument is a bitmap composed of the flags defined above.
135 Errors: EINVAL - one of the arguments is less than zero or there are no
136 threads declared at all."
137 (app-threads :int)
138 (io-threads :int)
139 (flags :int))
141 (defcfun ("zmq_term" term) :int
142 "Deinitialise 0MQ context. If there are still open sockets, actual
143 deinitialisation of the context is delayed till all the sockets are closed."
144 (context :pointer))
146 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
147 ;; 0MQ socket definition.
148 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
150 ;; Creating a 0MQ socket.
151 ;; **********************
153 (defconstant p2p 0
154 "Socket to communicate with a single peer. Allows for a singe connect or a
155 single accept. There's no message routing or message filtering involved.")
157 (defconstant pub 1
158 "Socket to distribute data. Recv fuction is not implemented for this socket
159 type. Messages are distributed in fanout fashion to all the peers.")
161 (defconstant sub 2
162 "Socket to subscribe for data. Send function is not implemented for this
163 socket type. However, subscribe function can be used to modify the
164 message filter (see ZMQ_SUBSCRIBE socket option).")
166 (defconstant req 3
167 "Socket to send requests and receive replies. Requests are
168 load-balanced among all the peers. This socket type allows
169 only an alternated sequence of send's and recv's")
171 (defconstant rep 4
172 "Socket to receive requests and send replies. This socket type allows
173 only an alternated sequence of recv's and send's. Each send is routed to
174 the peer that issued the last received request.")
176 (defcfun* ("zmq_socket" socket) :pointer
177 "Open a socket.
179 Errors: EINVAL - invalid socket type.
180 EMFILE - the number of application threads entitled to hold open
181 sockets at the same time was exceeded."
182 (context :pointer)
183 (type :int))
185 ;; Destroying the socket.
186 ;; **********************
188 (defcfun ("zmq_close" close) :int
189 "Close the socket."
190 (s :pointer))
192 ;; Manipulating socket options.
193 ;; ****************************
195 ;; Available socket options, their types and default values.
197 (defconstant hwm 1
198 "High watermark for the message pipes associated with the socket. The water
199 mark cannot be exceeded. If the messages don't fit into the pipe emergency
200 mechanisms of the particular socket type are used (block, drop etc.) If HWM
201 is set to zero, there are no limits for the content of the pipe.
202 Type: int64_t Unit: bytes Default: 0")
204 (defconstant lwm 2
205 "Low watermark makes sense only if high watermark is defined (is non-zero).
206 When the emergency state is reached when messages overflow the pipe, the
207 emergency lasts till the size of the pipe decreases to low watermark.
208 At that point normal state is resumed.
209 Type: int64_t Unit: bytes Default: 0")
211 (defconstant swap 3
212 "Swap allows the pipe to exceed high watermark. However, the data are written
213 to the disk rather than held in the memory. While the high watermark is not
214 exceeded there is no disk activity involved though. The value of the option
215 defines maximal size of the swap file.
216 Type: int64_t Unit: bytes Default: 0")
218 (defconstant affinity 4
219 "Affinity defines which threads in the thread pool will be used to handle
220 newly created sockets. This way you can dedicate some of the threads (CPUs)
221 to a specific work. Value of 0 means no affinity, work is distributed
222 fairly among the threads in the thread pool. For non-zero values, the lowest
223 bit corresponds to the thread 1, second lowest bit to the thread 2 etc.
224 Thus, value of 3 means that from now on newly created sockets will handle
225 I/O activity exclusively using threads no. 1 and 2.
226 Type: int64_t Unit: N/A (bitmap) Default: 0")
228 (defconstant identity 5
229 "Identity of the socket. Identity is important when restarting applications.
230 If the socket has no identity, each run of the application is completely
231 separated from other runs. However, with identity application reconnects to
232 existing infrastructure left by the previous run. Thus it may receive
233 messages that were sent in the meantime, it shares pipe limits with the
234 previous run etc.
235 Type: string Unit: N/A Default: NULL")
237 (defconstant subscribe 6
238 "Applicable only to 'sub' socket type. Eastablishes new message filter.
239 When 'sub' socket is created all the incoming messages are filtered out.
240 This option allows you to subscribe for all messages (\"*\"), messages with
241 specific topic (\"x.y.z\") and/or messages with specific topic prefix
242 (\"x.y.*\"). Topic is one-byte-size-prefixed string located at
243 the very beginning of the message. Multiple filters can be attached to
244 a single 'sub' socket. In that case message passes if it matches at least
245 one of the filters.
246 Type: string Unit: N/A Default: N/A")
248 (defconstant unsubscribe 7
249 "Applicable only to 'sub' socket type. Removes existing message filter.
250 The filter specified must match the string passed to ZMQ_SUBSCRIBE options
251 exactly. If there were several instances of the same filter created,
252 this options removes only one of them, leaving the rest in place
253 and functional.
254 Type: string Unit: N/A Default: N/A")
256 (defconstant rate 8
257 "This option applies only to multicast transports (pgm & udp). It specifies
258 maximal outgoing data rate that an individual sender socket can send.
259 Type: uint64_t Unit: kilobits/second Default: 100")
261 (defconstant recovery-ivl 9
262 "This option applies only to multicast transports (pgm & udp). It specifies
263 how long can the receiver socket survive when the sender is inaccessible.
264 Keep in mind that large recovery intervals at high data rates result in
265 very large recovery buffers, meaning that you can easily overload your box
266 by setting say 1 minute recovery interval at 1Gb/s rate (requires
267 7GB in-memory buffer).
268 Type: uint64_t Unit: seconds Default: 10")
270 (defconstant mcast-loop 10
271 "This option applies only to multicast transports (pgm & udp). Value of 1
272 means that the mutlicast packets can be received on the box they were sent
273 from. Setting the value to 0 disables the loopback functionality which
274 can have negative impact on the performance. if possible, disable
275 the loopback in production environments.
276 Type: uint64_t Unit: N/A (boolean value) Default: 1")
278 (defcfun* ("zmq_setsockopt" %setsockopt) :int
279 "Sets an option on the socket. 'option' argument specifies the option (see
280 the option list above). 'optval' is a pointer to the value to set,
281 'optvallen' is the size of the value in bytes.
283 Errors: EINVAL - unknown option, a value with incorrect length
284 or invalid value."
285 (s :pointer)
286 (option :int)
287 (optval :pointer)
288 (optvallen :long))
290 ;; Creating connections.
291 ;; *********************
293 ;; Addresses are composed of the name of the protocol to use followed by ://
294 ;; and a protocol-specific address. Available protocols:
296 ;; tcp - the address is composed of IP address and port delimited by colon
297 ;; sign (:). The IP address can be a hostname (with 'connect') or
298 ;; a network interface name (with 'bind'). Examples "tcp://eth0:5555",
299 ;; "tcp://192.168.0.1:20000", "tcp://hq.mycompany.com:80".
301 ;; pgm & udp - both protocols have same address format. It's network interface
302 ;; to use, semicolon (;), multicast group IP address, colon (:) and
303 ;; port. Examples: "pgm://eth2;224.0.0.1:8000",
304 ;; "udp://192.168.0.111;224.1.1.1:5555".
306 (defcfun* ("zmq_bind" %bind) :int
307 "Bind the socket to a particular address.
309 Errors: EPROTONOSUPPORT - unsupported protocol.
310 ENOCOMPATPROTO - protocol is not compatible with the socket type."
311 (s :pointer)
312 (addr :pointer :char))
314 (defcfun* ("zmq_connect" %connect) :int
315 "Connect the socket to a particular address.
317 Errors: EPROTONOSUPPORT - unsupported protocol.
318 ENOCOMPATPROTO - protocol is not compatible with the socket type."
319 (s :pointer)
320 (addr :pointer :char))
322 ;; Sending and receiving messages.
323 ;; *******************************
325 (defconstant noblock 1
326 "The flag specifying that the operation should be performed in
327 non-blocking mode. I.e. if it cannot be processed immediately,
328 error should be returned with errno set to EAGAIN.")
330 (defconstant noflush 2
331 "The flag specifying that zmq_send should not flush the message downstream
332 immediately. Instead, it should batch ZMQ_NOFLUSH messages and send them
333 downstream only if zmq_flush is invoked. This is an optimisation for cases
334 where several messages are sent in a single business transaction. However,
335 the effect is measurable only in extremely high-perf scenarios
336 (million messages a second or so). If that's not your case, use standard
337 flushing send instead.")
339 (defcfun* ("zmq_send" %send) :int
340 "Send the message 'msg' to the socket 's'. 'flags' argument can be
341 combination the flags described above.
343 Errors: EAGAIN - message cannot be sent at the moment (applies only to
344 non-blocking send).
345 ENOTSUP - function isn't supported by particular socket type.
346 EFSM - function cannot be called at the moment."
347 (s :pointer)
348 (msg msg)
349 :optional
350 (flags :int))
352 (defcfun* ("zmq_flush" flush) :int
353 "Flush the messages that were send using ZMQ_NOFLUSH flag down the stream.
355 Errors: ENOTSUP - function isn't supported by particular socket type.
356 EFSM - function cannot be called at the moment."
357 (s :pointer))
359 (defcfun* ("zmq_recv" %recv) :int
360 "Receive a message from the socket 's'. 'flags' argument can be combination
361 of the flags described above with the exception of ZMQ_NOFLUSH.
363 Errors: EAGAIN - message cannot be received at the moment (applies only to
364 non-blocking receive).
365 ENOTSUP - function isn't supported by particular socket type.
366 EFSM - function cannot be called at the moment."
367 (s :pointer)
368 (msg msg)
369 :optional
370 (flags :int))
372 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
373 ;; I/O multiplexing.
374 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
376 (defconstant pollin 1)
377 (defconstant pollout 2)
379 (def-c-struct pollitem
380 "'socket' is a 0MQ socket we want to poll on. If set to NULL, native file
381 descriptor (socket) 'fd' will be used instead. 'events' defines event we
382 are going to poll on - combination of ZMQ_POLLIN and ZMQ_POLLOUT. Error
383 event does not exist for portability reasons. Errors from native sockets
384 are reported as ZMQ_POLLIN. It's client's responsibilty to identify the
385 error afterwards. 'revents' field is filled in after function returns. It's
386 a combination of ZMQ_POLLIN and/or ZMQ_POLLOUT depending on the state of the
387 socket."
388 (socket :pointer)
389 (fd :int)
390 (events :short)
391 (revents :short))
393 (defcfun* ("zmq_poll" poll) :int
394 "Polls for the items specified by 'items'. Number of items in the array is
395 determined by 'nitems' argument. Returns number of items signaled, -1
396 in the case of error.
398 Errors: EFAULT - there's a 0MQ socket in the pollset belonging to
399 a different thread.
400 ENOTSUP - 0MQ context was initialised without ZMQ_POLL flag.
401 I/O multiplexing is disabled."
402 (items pollitem)
403 (nitems :int))
405 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
406 ;; Helper functions.
407 ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
409 ;; Helper functions used by perf tests so that they don't have to care
410 ;; about minutiae of time-related functions on different OS platforms.
412 (defcfun ("zmq_stopwatch_start" stopwatch-start) :pointer
413 "Starts the stopwatch. Returns the handle to the watch")
415 (defcfun ("zmq_stopwatch_stop" stopwatch-stop) :ulong
416 "Stops the stopwatch. Returns the number of microseconds elapsed since
417 the stopwatch was started."
418 (watch :pointer))
420 (defcfun ("zmq_sleep" sleep) :void
421 "Sleeps for specified number of seconds."
422 (seconds :int))