OSX compilation fixes
[lumina.git] / core / src / output_queue_socket.c
blob4f800b2d5946730b3c54f44b3cc57bc6aa8b9f6a
1 #include "output_queue.h"
2 #include "output_queue_socket.h"
3 #include "output_queue_socket_internal.h"
5 #include "out_strategy/memory_socket.h"
7 #include <errno.h> /* errno */
9 #include <ev.h>
11 #include <sys/types.h>
12 #include <sys/socket.h> /* send */
14 #include <stdio.h> /* printf */
15 /* TODO: Setup generic 'queue' so that input and output are handled in same queue */
17 static void SocketOutput_add(OutputQueue *queue, DataItem *item, Target *target);
18 static void free_OutputQueue_Socket(OutputQueue *queue);
20 /* DEFAULT STRATEGIES */
21 static OutStrategy *strategies[2] = { NULL /* mem sock */, NULL };
23 static OutputQueue_ops OutputQueue_Socket_ops = {
24 SocketOutput_add,
25 free_OutputQueue_Socket
28 typedef struct OutputQueue_Socket {
29 OutputQueue base;
30 core_t *core;
31 } OutputQueue_Socket;
33 OutputQueue *new_OutputQueue_Socket(core_t *core) {
34 OutputQueue_Socket *queue = calloc(1, sizeof(OutputQueue_Socket));
35 if(!queue) goto fail;
36 /* XXX: Initialize the strategies if not already done */
37 if(!strategies[0])
38 strategies[0] = new_OutStrategy_Memory_Socket();
39 queue->base.ops = &OutputQueue_Socket_ops;
40 queue->core = core;
41 return (OutputQueue*)queue;
42 fail:
43 return NULL;
46 static int OutputQueue_Socket_writeItem(Target_Socket *target, int fd, DataItem *item) {
47 /* Unorthodox... more should be moved into the strategy.. */
48 OutStrategy **ptr;
49 for(ptr = strategies; ptr && *ptr; ptr++) {
50 OutStrategy_Result result = (*ptr)->put(*ptr, item, (Target*)target);
51 switch(result) {
52 case OS_RESULT_WAIT:
53 return 0;
54 case OS_RESULT_COMPLETE:
55 return 1;
56 case OS_RESULT_SKIP:
57 continue;
58 default:
59 /* AGK: dunno what do do.. */
60 continue;
63 /* AGK: No strategies handle this */
64 return 1;
67 static void OutputQueue_Socket_writeCallback(struct ev_loop *loop, struct ev_io *w, int revents) {
68 /* HANDLE DATA */
69 Target_Socket *target = (Target_Socket*)w->data;
70 DataItem_queue *queue = &target->queue;
71 DataItem *item;
72 while((item = STAILQ_FIRST(queue))) {
73 /* WRITE - 0 == not complete, needs to wait */
74 if(0 == OutputQueue_Socket_writeItem(target, w->fd, item))
75 break;
76 /* Remove the item from the queue and release */
77 STAILQ_REMOVE_HEAD(queue, entries);
78 free_DataItem(item);
83 Target *new_Target_Socket(OutputQueue *queue, int fd) {
84 /* Currently no timeout */
85 Target_Socket *target = calloc(1, sizeof(Target_Socket));
86 if(!target) goto fail;
87 target->base.typeID = TARGET_SOCKET;
88 /* INIT THE QUEUE */
89 STAILQ_INIT(&target->queue);
90 ev_io_init(&target->writable, OutputQueue_Socket_writeCallback, fd, EV_WRITE);
91 target->writable.data = target;
92 /* don't start until data queued... */
93 return (Target*)target;
94 fail:
95 if(target) free_Target_Socket(queue, (Target*)target);
96 return NULL;
98 void free_Target_Socket(OutputQueue *queue, Target *target) {
99 assert(target->typeID == TARGET_SOCKET);
100 Target_Socket *t = (Target_Socket*)target;
101 DataItem *item;
102 DataItem_queue *q = &t->queue;
103 /* Make sure IO listener stopped */
104 if(ev_is_active(&t->writable))
105 ev_io_stop(((OutputQueue_Socket*)queue)->core->loop, &t->writable);
106 /* Release all items in the queue */
107 /* Cannot use foreach because it expects the item to exist for the next iteration */
108 while((item = STAILQ_FIRST(q))) {
109 STAILQ_REMOVE_HEAD(q, entries);
110 free_DataItem(item);
112 free(t);
115 static void SocketOutput_add(OutputQueue *queue, DataItem *item, Target *target) {
116 Target_Socket *t = (Target_Socket*)target;
117 STAILQ_INSERT_HEAD(&t->queue, item, entries);
118 if(!ev_is_active(&t->writable))
119 ev_io_start(((OutputQueue_Socket*)queue)->core->loop, &t->writable);
121 static void free_OutputQueue_Socket(OutputQueue *queue) {
122 free(queue);