cleanups
[lwes-ruby.git] / ext / lwes_ext / emitter.c
bloba7e5883ef55385bcc3326995735546c361f74e72
1 #include "lwes_ruby.h"
3 static VALUE ENC; /* LWES_ENCODING */
4 static ID id_TYPE_DB, id_TYPE_LIST, id_NAME, id_HAVE_ENCODING;
5 static ID id_new, id_enc, id_size;
6 static VALUE sym_enc;
8 static void dump_name(char *name, LWES_BYTE_P buf, size_t *off)
10 if (marshall_SHORT_STRING(name, buf, MAX_MSG_SIZE, off) > 0)
11 return;
12 rb_raise(rb_eRuntimeError, "failed to dump name=%s", name);
15 static int dump_bool(char *name, VALUE val, LWES_BYTE_P buf, size_t *off)
17 LWES_BOOLEAN tmp = FALSE;
19 if (val == Qtrue) {
20 tmp = TRUE;
21 } else if (val != Qfalse) {
22 volatile VALUE raise_inspect;
24 rb_raise(rb_eTypeError, "non-boolean set for %s: %s",
25 name, RAISE_INSPECT(val));
27 dump_name(name, buf, off);
28 lwesrb_dump_type(LWES_BOOLEAN_TOKEN, buf, off);
29 return marshall_BOOLEAN(tmp, buf, MAX_MSG_SIZE, off);
32 static int dump_string(char *name, VALUE val, LWES_BYTE_P buf, size_t *off)
34 char *dst;
36 switch (TYPE(val)) {
37 case T_BIGNUM:
38 case T_FIXNUM:
39 val = rb_obj_as_string(val);
41 dst = StringValueCStr(val);
43 dump_name(name, buf, off);
44 lwesrb_dump_type(LWES_STRING_TOKEN, buf, off);
45 return marshall_LONG_STRING(dst, buf, MAX_MSG_SIZE, off);
48 static void dump_enc(VALUE enc, LWES_BYTE_P buf, size_t *off)
50 dump_name(LWES_ENCODING, buf, off);
51 lwesrb_dump_num(LWES_INT_16_TOKEN, enc, buf, off);
54 static char *my_strdup(const char *str)
56 long len = strlen(str) + 1;
57 char *rv = xmalloc(len);
59 memcpy(rv, str, len);
61 return rv;
64 /* the underlying struct for LWES::Emitter */
65 struct _rb_lwes_emitter {
66 struct lwes_emitter *emitter;
67 char *address;
68 char *iface;
69 LWES_U_INT_32 port;
70 LWES_BOOLEAN emit_heartbeat;
71 LWES_INT_16 freq;
72 LWES_U_INT_32 ttl;
75 /* gets the _rb_lwes_emitter struct pointer from self */
76 static struct _rb_lwes_emitter * _rle(VALUE self)
78 struct _rb_lwes_emitter *rle;
80 Data_Get_Struct(self, struct _rb_lwes_emitter, rle);
82 return rle;
85 /* GC automatically calls this when object is finalized */
86 static void rle_free(void *ptr)
88 struct _rb_lwes_emitter *rle = ptr;
90 if (rle->emitter)
91 lwes_emitter_destroy(rle->emitter);
92 xfree(rle->address);
93 xfree(rle->iface);
94 xfree(ptr);
97 /* called by the GC when object is allocated */
98 static VALUE rle_alloc(VALUE klass)
100 struct _rb_lwes_emitter *rle;
102 return Data_Make_Struct(klass, struct _rb_lwes_emitter,
103 NULL, rle_free, rle);
107 * kv - Array:
108 * key => String,
109 * key => [ numeric_type, Numeric ],
110 * key => true,
111 * key => false,
112 * memo - lwes_event pointer
114 static VALUE event_hash_iter_i(VALUE kv, VALUE memo)
116 volatile VALUE raise_inspect;
117 VALUE *tmp = (VALUE *)memo;
118 VALUE val;
119 VALUE name;
120 char *attr_name;
121 int rv = 0;
122 LWES_BYTE_P buf = (LWES_BYTE_P)tmp[0];
123 size_t *off = (size_t *)tmp[1];
125 if (TYPE(kv) != T_ARRAY || RARRAY_LEN(kv) != 2)
126 rb_raise(rb_eTypeError,
127 "hash iteration not giving key-value pairs");
128 tmp = RARRAY_PTR(kv);
129 name = tmp[0];
131 if (name == sym_enc) return Qnil; /* already dumped first */
133 name = rb_obj_as_string(name);
134 attr_name = StringValueCStr(name);
136 if (strcmp(attr_name, LWES_ENCODING) == 0)
137 return Qnil;
139 val = tmp[1];
141 switch (TYPE(val)) {
142 case T_TRUE:
143 case T_FALSE:
144 rv = dump_bool(attr_name, val, buf, off);
145 break;
146 case T_ARRAY:
147 dump_name(attr_name, buf, off);
148 lwesrb_dump_num_ary(val, buf, off);
149 return Qnil;
150 case T_STRING:
151 rv = dump_string(attr_name, val, buf, off);
152 break;
155 if (rv > 0)
156 return Qnil;
158 rb_raise(rb_eArgError, "unhandled type %s=%s",
159 attr_name, RAISE_INSPECT(val));
160 return Qfalse;
163 static VALUE emit_hash(VALUE self, VALUE name, VALUE event)
165 struct _rb_lwes_emitter *rle = _rle(self);
166 LWES_BYTE_P buf = rle->emitter->buffer;
167 VALUE tmp[2];
168 size_t off = 0;
169 VALUE enc;
170 int size = NUM2INT(rb_funcall(event, id_size, 0, 0));
171 int rv;
172 char *event_name = StringValueCStr(name);
174 tmp[0] = (VALUE)buf;
175 tmp[1] = (VALUE)&off;
177 if (size < 0 || size > UINT16_MAX)
178 rb_raise(rb_eRangeError, "hash size out of uint16 range");
180 /* event name first */
181 dump_name(event_name, buf, &off);
183 /* number of attributes second */
184 rv = marshall_U_INT_16((LWES_U_INT_16)size, buf, MAX_MSG_SIZE, &off);
185 if (rv <= 0)
186 rb_raise(rb_eRuntimeError, "failed to dump num_attrs");
188 /* dump encoding before other fields */
189 enc = rb_hash_aref(event, sym_enc);
190 if (NIL_P(enc))
191 enc = rb_hash_aref(event, ENC);
192 if (! NIL_P(enc))
193 dump_enc(enc, buf, &off);
195 /* the rest of the fields */
196 rb_iterate(rb_each, event, event_hash_iter_i, (VALUE)&tmp);
198 if (lwes_emitter_emit_bytes(rle->emitter, buf, off) < 0)
199 rb_raise(rb_eRuntimeError, "failed to emit event");
201 return event;
204 static void
205 marshal_field(
206 char *name,
207 LWES_TYPE type,
208 VALUE val,
209 LWES_BYTE_P buf,
210 size_t *off)
212 volatile VALUE raise_inspect;
214 switch (type) {
215 case LWES_TYPE_STRING:
216 if (dump_string(name, val, buf, off) > 0)
217 return;
218 break;
219 case LWES_TYPE_BOOLEAN:
220 if (dump_bool(name, val, buf, off) > 0)
221 return;
222 break;
223 default:
224 dump_name(name, buf, off);
225 lwesrb_dump_num(type, val, buf, off);
226 return;
229 rb_raise(rb_eRuntimeError, "failed to set %s=%s",
230 name, RAISE_INSPECT(val));
233 static void lwes_struct_class(
234 VALUE *event_class,
235 VALUE *name,
236 VALUE *type_list,
237 VALUE *have_enc,
238 VALUE event)
240 VALUE type_db;
242 *event_class = CLASS_OF(event);
243 type_db = rb_const_get(*event_class, id_TYPE_DB);
245 if (CLASS_OF(type_db) != cLWES_TypeDB)
246 rb_raise(rb_eArgError, "class does not have valid TYPE_DB");
248 *name = rb_const_get(*event_class, id_NAME);
249 Check_Type(*name, T_STRING);
250 *type_list = rb_const_get(*event_class, id_TYPE_LIST);
251 Check_Type(*type_list, T_ARRAY);
253 *have_enc = rb_const_get(*event_class, id_HAVE_ENCODING);
256 static VALUE emit_struct(VALUE self, VALUE event)
258 VALUE event_class, name, type_list, have_enc;
259 struct _rb_lwes_emitter *rle = _rle(self);
260 LWES_BYTE_P buf = rle->emitter->buffer;
261 size_t off = 0;
262 long i;
263 VALUE *tmp;
264 LWES_U_INT_16 num_attr = 0;
265 size_t num_attr_off;
266 VALUE *flds;
267 char *str;
269 lwes_struct_class(&event_class, &name, &type_list, &have_enc, event);
271 /* event name */
272 str = StringValueCStr(name);
273 dump_name(str, buf, &off);
275 /* number of attributes, use a placeholder until we've iterated */
276 num_attr_off = off;
277 if (marshall_U_INT_16(0, buf, MAX_MSG_SIZE, &off) < 0)
278 rb_raise(rb_eRuntimeError,
279 "failed to marshal number_of_attributes");
281 /* dump encoding before other fields */
282 if (have_enc == Qtrue) {
283 VALUE enc = rb_funcall(event, id_enc, 0, 0);
284 if (! NIL_P(enc)) {
285 ++num_attr;
286 dump_enc(enc, buf, &off);
290 i = RARRAY_LEN(type_list);
291 flds = RSTRUCT_PTR(event);
292 tmp = RARRAY_PTR(type_list);
293 for (; --i >= 0; tmp++, flds++) {
294 /* inner: [ :field_sym, "field_name", type ] */
295 VALUE *inner = RARRAY_PTR(*tmp);
296 VALUE val;
297 LWES_TYPE type;
299 if (inner[0] == sym_enc) /* encoding was already dumped */
300 continue;
302 val = *flds;
303 if (NIL_P(val))
304 continue; /* LWES doesn't know nil */
306 str = StringValueCStr(inner[1]);
307 type = NUM2INT(inner[2]);
308 ++num_attr;
309 marshal_field(str, type, val, buf, &off);
312 /* now we've iterated, we can accurately give num_attr */
313 if (marshall_U_INT_16(num_attr, buf, MAX_MSG_SIZE, &num_attr_off) <= 0)
314 rb_raise(rb_eRuntimeError, "failed to marshal num_attr");
316 if (lwes_emitter_emit_bytes(rle->emitter, buf, off) < 0)
317 rb_raise(rb_eRuntimeError, "failed to emit event");
319 return event;
322 static VALUE emit_event(VALUE self, VALUE event)
324 struct lwes_event *e = lwesrb_get_event(event);
326 if (lwes_emitter_emit(_rle(self)->emitter, e) < 0)
327 rb_raise(rb_eRuntimeError, "failed to emit event");
329 return event;
332 * call-seq:
333 * emitter = LWES::Emitter.new
334 * event = EventStruct.new
335 * event.foo = "bar"
336 * emitter << event
338 static VALUE emitter_ltlt(VALUE self, VALUE event)
340 if (rb_obj_is_kind_of(event, cLWES_Event)) {
341 return emit_event(self, event);
342 } else {
343 Check_Type(event, T_STRUCT);
345 return emit_struct(self, event);
350 * call-seq:
351 * emitter = LWES::Emitter.new
353 * emitter.emit("EventName", :foo => "HI")
355 * emitter.emit(EventStruct, :foo => "HI")
357 * struct = EventStruct.new
358 * struct.foo = "HI"
359 * emitter.emit(struct)
361 static VALUE emitter_emit(int argc, VALUE *argv, VALUE self)
363 volatile VALUE raise_inspect;
364 VALUE name = Qnil;
365 VALUE event = Qnil;
366 argc = rb_scan_args(argc, argv, "11", &name, &event);
368 switch (TYPE(name)) {
369 case T_STRING:
370 if (TYPE(event) == T_HASH)
371 return emit_hash(self, name, event);
372 rb_raise(rb_eTypeError,
373 "second argument must be a hash when first "
374 "is a String");
375 case T_STRUCT:
376 if (argc >= 2)
377 rb_raise(rb_eArgError,
378 "second argument not allowed when first"
379 " is a Struct");
380 event = name;
381 return emit_struct(self, event);
382 case T_CLASS:
383 if (TYPE(event) != T_HASH)
384 rb_raise(rb_eTypeError,
385 "second argument must be a Hash when first"
386 " is a Class");
389 * we can optimize this so there's no intermediate
390 * struct created
392 event = rb_funcall(name, id_new, 1, event);
393 return emit_struct(self, event);
394 default:
395 if (rb_obj_is_kind_of(name, cLWES_Event))
396 return emit_event(self, name);
397 rb_raise(rb_eArgError,
398 "bad argument: %s, must be a String, Struct or Class",
399 RAISE_INSPECT(name));
402 assert(0 && "should never get here");
403 return event;
407 * Destroys the associated lwes_emitter and the associated socket. This
408 * method is rarely needed as Ruby garbage collection will take care of
409 * closing for you, but may be useful in odd cases when it is desirable
410 * to release file descriptors ASAP.
412 static VALUE emitter_close(VALUE self)
414 struct _rb_lwes_emitter *rle = _rle(self);
416 if (rle->emitter)
417 lwes_emitter_destroy(rle->emitter);
418 rle->emitter = NULL;
420 return Qnil;
423 static void lwesrb_emitter_create(struct _rb_lwes_emitter *rle)
425 int gc_retry = 1;
426 retry:
427 if (rle->ttl == UINT32_MAX)
428 rle->emitter = lwes_emitter_create(
429 rle->address, rle->iface, rle->port,
430 rle->emit_heartbeat, rle->freq);
431 else
432 rle->emitter = lwes_emitter_create_with_ttl(
433 rle->address, rle->iface, rle->port,
434 rle->emit_heartbeat, rle->freq, rle->ttl);
436 if (!rle->emitter) {
437 if (--gc_retry == 0) {
438 rb_gc();
439 goto retry;
441 rb_raise(rb_eRuntimeError, "failed to create LWES emitter");
445 /* :nodoc: */
446 static VALUE init_copy(VALUE dest, VALUE obj)
448 struct _rb_lwes_emitter *dst = _rle(dest);
449 struct _rb_lwes_emitter *src = _rle(obj);
451 memcpy(dst, src, sizeof(*dst));
452 dst->address = my_strdup(src->address);
453 if (dst->iface)
454 dst->iface = my_strdup(src->iface);
455 lwesrb_emitter_create(dst);
457 assert(dst->emitter && dst->emitter != src->emitter &&
458 "emitter not a copy");
460 return dest;
463 /* should only used internally by #initialize */
464 static VALUE _create(VALUE self, VALUE options)
466 struct _rb_lwes_emitter *rle = _rle(self);
467 VALUE address, iface, port, heartbeat, ttl;
469 rle->emit_heartbeat = FALSE;
470 rle->freq = 0;
471 rle->ttl = UINT32_MAX; /* nobody sets a ttl this long, right? */
473 if (rle->emitter)
474 rb_raise(rb_eRuntimeError, "already created lwes_emitter");
475 if (TYPE(options) != T_HASH)
476 rb_raise(rb_eTypeError, "options must be a hash");
478 address = rb_hash_aref(options, ID2SYM(rb_intern("address")));
479 if (TYPE(address) != T_STRING)
480 rb_raise(rb_eTypeError, ":address must be a string");
481 rle->address = my_strdup(StringValueCStr(address));
483 iface = rb_hash_aref(options, ID2SYM(rb_intern("iface")));
484 switch (TYPE(iface)) {
485 case T_NIL:
486 rle->iface = NULL;
487 break;
488 case T_STRING:
489 rle->iface = my_strdup(StringValueCStr(iface));
490 break;
491 default:
492 rb_raise(rb_eTypeError, ":iface must be a String or nil");
495 port = rb_hash_aref(options, ID2SYM(rb_intern("port")));
496 if (TYPE(port) != T_FIXNUM)
497 rb_raise(rb_eTypeError, ":port must be a Fixnum");
498 rle->port = NUM2UINT(port);
500 heartbeat = rb_hash_aref(options, ID2SYM(rb_intern("heartbeat")));
501 if (TYPE(heartbeat) == T_FIXNUM) {
502 int tmp = NUM2INT(heartbeat);
503 if (tmp > INT16_MAX)
504 rb_raise(rb_eArgError,":heartbeat > INT16_MAX seconds");
505 rle->emit_heartbeat = TRUE;
506 rle->freq = (LWES_INT_16)tmp;
507 } else if (NIL_P(heartbeat)) { /* do nothing, use defaults */
508 } else
509 rb_raise(rb_eTypeError, ":heartbeat must be a Fixnum or nil");
511 ttl = rb_hash_aref(options, ID2SYM(rb_intern("ttl")));
512 if (TYPE(ttl) == T_FIXNUM) {
513 unsigned LONG_LONG tmp = NUM2ULL(ttl);
514 if (tmp >= UINT32_MAX)
515 rb_raise(rb_eArgError, ":ttl >= UINT32_MAX seconds");
516 rle->ttl = (LWES_U_INT_32)tmp;
517 } else if (NIL_P(ttl)) { /* do nothing, no ttl */
518 } else
519 rb_raise(rb_eTypeError, ":ttl must be a Fixnum or nil");
521 lwesrb_emitter_create(rle);
523 return self;
526 /* Init_lwes_ext will call this */
527 void lwesrb_init_emitter(void)
529 VALUE mLWES = rb_define_module("LWES");
530 VALUE cLWES_Emitter =
531 rb_define_class_under(mLWES, "Emitter", rb_cObject);
533 rb_define_method(cLWES_Emitter, "<<", emitter_ltlt, 1);
534 rb_define_method(cLWES_Emitter, "emit", emitter_emit, -1);
535 rb_define_method(cLWES_Emitter, "_create", _create, 1);
536 rb_define_method(cLWES_Emitter, "close", emitter_close, 0);
537 rb_define_method(cLWES_Emitter, "initialize_copy", init_copy, 1);
538 rb_define_alloc_func(cLWES_Emitter, rle_alloc);
539 LWESRB_MKID(TYPE_DB);
540 LWESRB_MKID(TYPE_LIST);
541 LWESRB_MKID(NAME);
542 LWESRB_MKID(HAVE_ENCODING);
543 LWESRB_MKID(new);
544 LWESRB_MKID(size);
545 id_enc = rb_intern(LWES_ENCODING);
546 sym_enc = ID2SYM(id_enc);
548 ENC = rb_obj_freeze(rb_str_new2(LWES_ENCODING));
549 rb_define_const(mLWES, "ENCODING", ENC);