3 static VALUE ENC
; /* LWES_ENCODING */
4 static ID id_TYPE_DB
, id_TYPE_LIST
, id_NAME
, id_HAVE_ENCODING
;
5 static ID id_new
, id_enc
, id_size
;
8 static void dump_name(char *name
, LWES_BYTE_P buf
, size_t *off
)
10 if (marshall_SHORT_STRING(name
, buf
, MAX_MSG_SIZE
, off
) > 0)
12 rb_raise(rb_eRuntimeError
, "failed to dump name=%s", name
);
15 static int dump_bool(char *name
, VALUE val
, LWES_BYTE_P buf
, size_t *off
)
17 LWES_BOOLEAN tmp
= FALSE
;
21 } else if (val
!= Qfalse
) {
22 volatile VALUE raise_inspect
;
24 rb_raise(rb_eTypeError
, "non-boolean set for %s: %s",
25 name
, RAISE_INSPECT(val
));
27 dump_name(name
, buf
, off
);
28 lwesrb_dump_type(LWES_BOOLEAN_TOKEN
, buf
, off
);
29 return marshall_BOOLEAN(tmp
, buf
, MAX_MSG_SIZE
, off
);
32 static int dump_string(char *name
, VALUE val
, LWES_BYTE_P buf
, size_t *off
)
39 val
= rb_obj_as_string(val
);
41 dst
= StringValueCStr(val
);
43 dump_name(name
, buf
, off
);
44 lwesrb_dump_type(LWES_STRING_TOKEN
, buf
, off
);
45 return marshall_LONG_STRING(dst
, buf
, MAX_MSG_SIZE
, off
);
48 static void dump_enc(VALUE enc
, LWES_BYTE_P buf
, size_t *off
)
50 dump_name(LWES_ENCODING
, buf
, off
);
51 lwesrb_dump_num(LWES_INT_16_TOKEN
, enc
, buf
, off
);
54 static char *my_strdup(const char *str
)
56 long len
= strlen(str
) + 1;
57 char *rv
= xmalloc(len
);
64 /* the underlying struct for LWES::Emitter */
65 struct _rb_lwes_emitter
{
66 struct lwes_emitter
*emitter
;
70 LWES_BOOLEAN emit_heartbeat
;
75 /* gets the _rb_lwes_emitter struct pointer from self */
76 static struct _rb_lwes_emitter
* _rle(VALUE self
)
78 struct _rb_lwes_emitter
*rle
;
80 Data_Get_Struct(self
, struct _rb_lwes_emitter
, rle
);
85 /* GC automatically calls this when object is finalized */
86 static void rle_free(void *ptr
)
88 struct _rb_lwes_emitter
*rle
= ptr
;
91 lwes_emitter_destroy(rle
->emitter
);
97 /* called by the GC when object is allocated */
98 static VALUE
rle_alloc(VALUE klass
)
100 struct _rb_lwes_emitter
*rle
;
102 return Data_Make_Struct(klass
, struct _rb_lwes_emitter
,
103 NULL
, rle_free
, rle
);
109 * key => [ numeric_type, Numeric ],
112 * memo - lwes_event pointer
114 static VALUE
event_hash_iter_i(VALUE kv
, VALUE memo
)
116 volatile VALUE raise_inspect
;
117 VALUE
*tmp
= (VALUE
*)memo
;
122 LWES_BYTE_P buf
= (LWES_BYTE_P
)tmp
[0];
123 size_t *off
= (size_t *)tmp
[1];
125 if (TYPE(kv
) != T_ARRAY
|| RARRAY_LEN(kv
) != 2)
126 rb_raise(rb_eTypeError
,
127 "hash iteration not giving key-value pairs");
128 tmp
= RARRAY_PTR(kv
);
131 if (name
== sym_enc
) return Qnil
; /* already dumped first */
133 name
= rb_obj_as_string(name
);
134 attr_name
= StringValueCStr(name
);
136 if (strcmp(attr_name
, LWES_ENCODING
) == 0)
144 rv
= dump_bool(attr_name
, val
, buf
, off
);
147 dump_name(attr_name
, buf
, off
);
148 lwesrb_dump_num_ary(val
, buf
, off
);
151 rv
= dump_string(attr_name
, val
, buf
, off
);
158 rb_raise(rb_eArgError
, "unhandled type %s=%s",
159 attr_name
, RAISE_INSPECT(val
));
163 static VALUE
emit_hash(VALUE self
, VALUE name
, VALUE event
)
165 struct _rb_lwes_emitter
*rle
= _rle(self
);
166 LWES_BYTE_P buf
= rle
->emitter
->buffer
;
170 int size
= NUM2INT(rb_funcall(event
, id_size
, 0, 0));
172 char *event_name
= StringValueCStr(name
);
175 tmp
[1] = (VALUE
)&off
;
177 if (size
< 0 || size
> UINT16_MAX
)
178 rb_raise(rb_eRangeError
, "hash size out of uint16 range");
180 /* event name first */
181 dump_name(event_name
, buf
, &off
);
183 /* number of attributes second */
184 rv
= marshall_U_INT_16((LWES_U_INT_16
)size
, buf
, MAX_MSG_SIZE
, &off
);
186 rb_raise(rb_eRuntimeError
, "failed to dump num_attrs");
188 /* dump encoding before other fields */
189 enc
= rb_hash_aref(event
, sym_enc
);
191 enc
= rb_hash_aref(event
, ENC
);
193 dump_enc(enc
, buf
, &off
);
195 /* the rest of the fields */
196 rb_iterate(rb_each
, event
, event_hash_iter_i
, (VALUE
)&tmp
);
198 if (lwes_emitter_emit_bytes(rle
->emitter
, buf
, off
) < 0)
199 rb_raise(rb_eRuntimeError
, "failed to emit event");
212 volatile VALUE raise_inspect
;
215 case LWES_TYPE_STRING
:
216 if (dump_string(name
, val
, buf
, off
) > 0)
219 case LWES_TYPE_BOOLEAN
:
220 if (dump_bool(name
, val
, buf
, off
) > 0)
224 dump_name(name
, buf
, off
);
225 lwesrb_dump_num(type
, val
, buf
, off
);
229 rb_raise(rb_eRuntimeError
, "failed to set %s=%s",
230 name
, RAISE_INSPECT(val
));
233 static void lwes_struct_class(
242 *event_class
= CLASS_OF(event
);
243 type_db
= rb_const_get(*event_class
, id_TYPE_DB
);
245 if (CLASS_OF(type_db
) != cLWES_TypeDB
)
246 rb_raise(rb_eArgError
, "class does not have valid TYPE_DB");
248 *name
= rb_const_get(*event_class
, id_NAME
);
249 Check_Type(*name
, T_STRING
);
250 *type_list
= rb_const_get(*event_class
, id_TYPE_LIST
);
251 Check_Type(*type_list
, T_ARRAY
);
253 *have_enc
= rb_const_get(*event_class
, id_HAVE_ENCODING
);
256 static VALUE
emit_struct(VALUE self
, VALUE event
)
258 VALUE event_class
, name
, type_list
, have_enc
;
259 struct _rb_lwes_emitter
*rle
= _rle(self
);
260 LWES_BYTE_P buf
= rle
->emitter
->buffer
;
264 LWES_U_INT_16 num_attr
= 0;
269 lwes_struct_class(&event_class
, &name
, &type_list
, &have_enc
, event
);
272 str
= StringValueCStr(name
);
273 dump_name(str
, buf
, &off
);
275 /* number of attributes, use a placeholder until we've iterated */
277 if (marshall_U_INT_16(0, buf
, MAX_MSG_SIZE
, &off
) < 0)
278 rb_raise(rb_eRuntimeError
,
279 "failed to marshal number_of_attributes");
281 /* dump encoding before other fields */
282 if (have_enc
== Qtrue
) {
283 VALUE enc
= rb_funcall(event
, id_enc
, 0, 0);
286 dump_enc(enc
, buf
, &off
);
290 i
= RARRAY_LEN(type_list
);
291 flds
= RSTRUCT_PTR(event
);
292 tmp
= RARRAY_PTR(type_list
);
293 for (; --i
>= 0; tmp
++, flds
++) {
294 /* inner: [ :field_sym, "field_name", type ] */
295 VALUE
*inner
= RARRAY_PTR(*tmp
);
299 if (inner
[0] == sym_enc
) /* encoding was already dumped */
304 continue; /* LWES doesn't know nil */
306 str
= StringValueCStr(inner
[1]);
307 type
= NUM2INT(inner
[2]);
309 marshal_field(str
, type
, val
, buf
, &off
);
312 /* now we've iterated, we can accurately give num_attr */
313 if (marshall_U_INT_16(num_attr
, buf
, MAX_MSG_SIZE
, &num_attr_off
) <= 0)
314 rb_raise(rb_eRuntimeError
, "failed to marshal num_attr");
316 if (lwes_emitter_emit_bytes(rle
->emitter
, buf
, off
) < 0)
317 rb_raise(rb_eRuntimeError
, "failed to emit event");
322 static VALUE
emit_event(VALUE self
, VALUE event
)
324 struct lwes_event
*e
= lwesrb_get_event(event
);
326 if (lwes_emitter_emit(_rle(self
)->emitter
, e
) < 0)
327 rb_raise(rb_eRuntimeError
, "failed to emit event");
333 * emitter = LWES::Emitter.new
334 * event = EventStruct.new
338 static VALUE
emitter_ltlt(VALUE self
, VALUE event
)
340 if (rb_obj_is_kind_of(event
, cLWES_Event
)) {
341 return emit_event(self
, event
);
343 Check_Type(event
, T_STRUCT
);
345 return emit_struct(self
, event
);
351 * emitter = LWES::Emitter.new
353 * emitter.emit("EventName", :foo => "HI")
355 * emitter.emit(EventStruct, :foo => "HI")
357 * struct = EventStruct.new
359 * emitter.emit(struct)
361 static VALUE
emitter_emit(int argc
, VALUE
*argv
, VALUE self
)
363 volatile VALUE raise_inspect
;
366 argc
= rb_scan_args(argc
, argv
, "11", &name
, &event
);
368 switch (TYPE(name
)) {
370 if (TYPE(event
) == T_HASH
)
371 return emit_hash(self
, name
, event
);
372 rb_raise(rb_eTypeError
,
373 "second argument must be a hash when first "
377 rb_raise(rb_eArgError
,
378 "second argument not allowed when first"
381 return emit_struct(self
, event
);
383 if (TYPE(event
) != T_HASH
)
384 rb_raise(rb_eTypeError
,
385 "second argument must be a Hash when first"
389 * we can optimize this so there's no intermediate
392 event
= rb_funcall(name
, id_new
, 1, event
);
393 return emit_struct(self
, event
);
395 if (rb_obj_is_kind_of(name
, cLWES_Event
))
396 return emit_event(self
, name
);
397 rb_raise(rb_eArgError
,
398 "bad argument: %s, must be a String, Struct or Class",
399 RAISE_INSPECT(name
));
402 assert(0 && "should never get here");
407 * Destroys the associated lwes_emitter and the associated socket. This
408 * method is rarely needed as Ruby garbage collection will take care of
409 * closing for you, but may be useful in odd cases when it is desirable
410 * to release file descriptors ASAP.
412 static VALUE
emitter_close(VALUE self
)
414 struct _rb_lwes_emitter
*rle
= _rle(self
);
417 lwes_emitter_destroy(rle
->emitter
);
423 static void lwesrb_emitter_create(struct _rb_lwes_emitter
*rle
)
427 if (rle
->ttl
== UINT32_MAX
)
428 rle
->emitter
= lwes_emitter_create(
429 rle
->address
, rle
->iface
, rle
->port
,
430 rle
->emit_heartbeat
, rle
->freq
);
432 rle
->emitter
= lwes_emitter_create_with_ttl(
433 rle
->address
, rle
->iface
, rle
->port
,
434 rle
->emit_heartbeat
, rle
->freq
, rle
->ttl
);
437 if (--gc_retry
== 0) {
441 rb_raise(rb_eRuntimeError
, "failed to create LWES emitter");
446 static VALUE
init_copy(VALUE dest
, VALUE obj
)
448 struct _rb_lwes_emitter
*dst
= _rle(dest
);
449 struct _rb_lwes_emitter
*src
= _rle(obj
);
451 memcpy(dst
, src
, sizeof(*dst
));
452 dst
->address
= my_strdup(src
->address
);
454 dst
->iface
= my_strdup(src
->iface
);
455 lwesrb_emitter_create(dst
);
457 assert(dst
->emitter
&& dst
->emitter
!= src
->emitter
&&
458 "emitter not a copy");
463 /* should only used internally by #initialize */
464 static VALUE
_create(VALUE self
, VALUE options
)
466 struct _rb_lwes_emitter
*rle
= _rle(self
);
467 VALUE address
, iface
, port
, heartbeat
, ttl
;
469 rle
->emit_heartbeat
= FALSE
;
471 rle
->ttl
= UINT32_MAX
; /* nobody sets a ttl this long, right? */
474 rb_raise(rb_eRuntimeError
, "already created lwes_emitter");
475 if (TYPE(options
) != T_HASH
)
476 rb_raise(rb_eTypeError
, "options must be a hash");
478 address
= rb_hash_aref(options
, ID2SYM(rb_intern("address")));
479 if (TYPE(address
) != T_STRING
)
480 rb_raise(rb_eTypeError
, ":address must be a string");
481 rle
->address
= my_strdup(StringValueCStr(address
));
483 iface
= rb_hash_aref(options
, ID2SYM(rb_intern("iface")));
484 switch (TYPE(iface
)) {
489 rle
->iface
= my_strdup(StringValueCStr(iface
));
492 rb_raise(rb_eTypeError
, ":iface must be a String or nil");
495 port
= rb_hash_aref(options
, ID2SYM(rb_intern("port")));
496 if (TYPE(port
) != T_FIXNUM
)
497 rb_raise(rb_eTypeError
, ":port must be a Fixnum");
498 rle
->port
= NUM2UINT(port
);
500 heartbeat
= rb_hash_aref(options
, ID2SYM(rb_intern("heartbeat")));
501 if (TYPE(heartbeat
) == T_FIXNUM
) {
502 int tmp
= NUM2INT(heartbeat
);
504 rb_raise(rb_eArgError
,":heartbeat > INT16_MAX seconds");
505 rle
->emit_heartbeat
= TRUE
;
506 rle
->freq
= (LWES_INT_16
)tmp
;
507 } else if (NIL_P(heartbeat
)) { /* do nothing, use defaults */
509 rb_raise(rb_eTypeError
, ":heartbeat must be a Fixnum or nil");
511 ttl
= rb_hash_aref(options
, ID2SYM(rb_intern("ttl")));
512 if (TYPE(ttl
) == T_FIXNUM
) {
513 unsigned LONG_LONG tmp
= NUM2ULL(ttl
);
514 if (tmp
>= UINT32_MAX
)
515 rb_raise(rb_eArgError
, ":ttl >= UINT32_MAX seconds");
516 rle
->ttl
= (LWES_U_INT_32
)tmp
;
517 } else if (NIL_P(ttl
)) { /* do nothing, no ttl */
519 rb_raise(rb_eTypeError
, ":ttl must be a Fixnum or nil");
521 lwesrb_emitter_create(rle
);
526 /* Init_lwes_ext will call this */
527 void lwesrb_init_emitter(void)
529 VALUE mLWES
= rb_define_module("LWES");
530 VALUE cLWES_Emitter
=
531 rb_define_class_under(mLWES
, "Emitter", rb_cObject
);
533 rb_define_method(cLWES_Emitter
, "<<", emitter_ltlt
, 1);
534 rb_define_method(cLWES_Emitter
, "emit", emitter_emit
, -1);
535 rb_define_method(cLWES_Emitter
, "_create", _create
, 1);
536 rb_define_method(cLWES_Emitter
, "close", emitter_close
, 0);
537 rb_define_method(cLWES_Emitter
, "initialize_copy", init_copy
, 1);
538 rb_define_alloc_func(cLWES_Emitter
, rle_alloc
);
539 LWESRB_MKID(TYPE_DB
);
540 LWESRB_MKID(TYPE_LIST
);
542 LWESRB_MKID(HAVE_ENCODING
);
545 id_enc
= rb_intern(LWES_ENCODING
);
546 sym_enc
= ID2SYM(id_enc
);
548 ENC
= rb_obj_freeze(rb_str_new2(LWES_ENCODING
));
549 rb_define_const(mLWES
, "ENCODING", ENC
);