1 #include "output_queue.h"
2 #include "output_queue_socket.h"
3 #include "output_queue_socket_internal.h"
5 #include "out_strategy/memory_socket.h"
7 #include <errno.h> /* errno */
11 #include <sys/types.h>
12 #include <sys/socket.h> /* send */
14 #include <stdio.h> /* printf */
15 /* TODO: Setup generic 'queue' so that input and output are handled in same queue */
17 static void SocketOutput_add(OutputQueue
*queue
, DataItem
*item
, Target
*target
);
18 static void free_OutputQueue_Socket(OutputQueue
*queue
);
20 /* DEFAULT STRATEGIES */
21 static OutStrategy
*strategies
[2] = { NULL
/* mem sock */, NULL
};
23 static OutputQueue_ops OutputQueue_Socket_ops
= {
25 free_OutputQueue_Socket
28 typedef struct OutputQueue_Socket
{
33 OutputQueue
*new_OutputQueue_Socket(core_t
*core
) {
34 OutputQueue_Socket
*queue
= calloc(1, sizeof(OutputQueue_Socket
));
36 /* XXX: Initialize the strategies if not already done */
38 strategies
[0] = new_OutStrategy_Memory_Socket();
39 queue
->base
.ops
= &OutputQueue_Socket_ops
;
41 return (OutputQueue
*)queue
;
46 static int OutputQueue_Socket_writeItem(Target_Socket
*target
, int fd
, DataItem
*item
) {
47 /* Unorthodox... more should be moved into the strategy.. */
49 for(ptr
= strategies
; ptr
&& *ptr
; ptr
++) {
50 OutStrategy_Result result
= (*ptr
)->put(*ptr
, item
, (Target
*)target
);
54 case OS_RESULT_COMPLETE
:
59 /* AGK: dunno what do do.. */
63 /* AGK: No strategies handle this */
67 static void OutputQueue_Socket_writeCallback(struct ev_loop
*loop
, struct ev_io
*w
, int revents
) {
69 Target_Socket
*target
= (Target_Socket
*)w
->data
;
70 DataItem_queue
*queue
= &target
->queue
;
72 while((item
= STAILQ_FIRST(queue
))) {
73 /* WRITE - 0 == not complete, needs to wait */
74 if(0 == OutputQueue_Socket_writeItem(target
, w
->fd
, item
))
76 /* Remove the item from the queue and release */
77 STAILQ_REMOVE_HEAD(queue
, entries
);
83 Target
*new_Target_Socket(OutputQueue
*queue
, int fd
) {
84 /* Currently no timeout */
85 Target_Socket
*target
= calloc(1, sizeof(Target_Socket
));
86 if(!target
) goto fail
;
87 target
->base
.typeID
= TARGET_SOCKET
;
89 STAILQ_INIT(&target
->queue
);
90 ev_io_init(&target
->writable
, OutputQueue_Socket_writeCallback
, fd
, EV_WRITE
);
91 target
->writable
.data
= target
;
92 /* don't start until data queued... */
93 return (Target
*)target
;
95 if(target
) free_Target_Socket(queue
, (Target
*)target
);
98 void free_Target_Socket(OutputQueue
*queue
, Target
*target
) {
99 assert(target
->typeID
== TARGET_SOCKET
);
100 Target_Socket
*t
= (Target_Socket
*)target
;
102 DataItem_queue
*q
= &t
->queue
;
103 /* Make sure IO listener stopped */
104 if(ev_is_active(&t
->writable
))
105 ev_io_stop(((OutputQueue_Socket
*)queue
)->core
->loop
, &t
->writable
);
106 /* Release all items in the queue */
107 /* Cannot use foreach because it expects the item to exist for the next iteration */
108 while((item
= STAILQ_FIRST(q
))) {
109 STAILQ_REMOVE_HEAD(q
, entries
);
115 static void SocketOutput_add(OutputQueue
*queue
, DataItem
*item
, Target
*target
) {
116 Target_Socket
*t
= (Target_Socket
*)target
;
117 STAILQ_INSERT_HEAD(&t
->queue
, item
, entries
);
118 if(!ev_is_active(&t
->writable
))
119 ev_io_start(((OutputQueue_Socket
*)queue
)->core
->loop
, &t
->writable
);
121 static void free_OutputQueue_Socket(OutputQueue
*queue
) {