2 * we're currently too lazy to use rb_ensure to free an allocation, so we
3 * the abuse rb_str_* API for a temporary buffer
5 #define RSTRING_MODIFIED 1
15 static ssize_t
assert_writev(int fd
, void* iov
, int len
)
17 assert(0 && "you should not try to call writev");
20 # define writev assert_writev
23 #ifndef HAVE_RB_ARY_SUBSEQ
24 static inline VALUE
my_ary_subseq(VALUE ary
, long idx
, long len
)
26 VALUE args
[2] = { LONG2FIX(idx
), LONG2FIX(len
) };
28 return rb_ary_aref(2, args
, ary
);
30 #define rb_ary_subseq my_ary_subseq
33 static VALUE sym_wait_writable
;
36 #define iovec my_iovec
43 /* tests for choosing following constants were done on Linux 3.0 x86_64
44 * (Ubuntu 12.04) Core i3 i3-2330M slowed to 1600MHz
45 * testing script https://gist.github.com/2850641
46 * fill free to make more thorough testing and choose better value
49 /* test shows that its meaningless to set WRITEV_MEMLIMIT more that 1M
50 * even when tcp_wmem set to relatively high value (2M) (in fact, it becomes
51 * even slower). 512K performs a bit better in average case. */
52 #define WRITEV_MEMLIMIT (512*1024)
53 /* same test shows that custom_writev is faster than glibc writev when
54 * average string is smaller than ~500 bytes and slower when average strings
55 * is greater then ~600 bytes. 512 bytes were choosen cause current compilers
56 * turns x/512 into x>>9 */
57 #define WRITEV_IMPL_THRESHOLD 512
59 static int iov_max
= 1024; /* this could be overriden in init */
64 VALUE vec_buf
; /* FIXME: this requires RSTRING_MODIFY for rbx */
68 int something_written
;
72 static ssize_t
custom_writev(int fd
, const struct iovec
*vec
, int iov_cnt
, size_t total_len
)
77 const struct iovec
*curvec
= vec
;
79 /* we do not want to use ruby's xmalloc because
80 * it can fire GC, and we'll free buffer shortly anyway */
81 curbuf
= buf
= malloc(total_len
);
82 if (buf
== NULL
) return -1;
84 for (i
= 0; i
< iov_cnt
; i
++, curvec
++) {
85 memcpy(curbuf
, curvec
->iov_base
, curvec
->iov_len
);
86 curbuf
+= curvec
->iov_len
;
89 result
= write(fd
, buf
, total_len
);
91 /* well, it seems that `free` could not change errno
92 * but lets save it anyway */
100 static void prepare_writev(struct wrv_args
*a
, VALUE io
, VALUE ary
)
103 a
->fd
= my_fileno(io
);
104 a
->something_written
= 0;
106 if (TYPE(ary
) == T_ARRAY
)
107 /* rb_ary_subseq will not copy array unless it modified */
108 a
->buf
= rb_ary_subseq(ary
, 0, RARRAY_LEN(ary
));
110 a
->buf
= rb_Array(ary
);
112 a
->vec_buf
= rb_str_new(0, 0);
116 #ifndef RARRAY_LENINT
117 static inline int rarray_int(VALUE val
)
119 long num
= RARRAY_LEN(val
);
121 if ((long)(int)num
!= num
)
122 rb_raise(rb_eRangeError
, "%ld cannot to be an int", num
);
126 #define RARRAY_LENINT(n) rarray_int(n)
129 static void fill_iovec(struct wrv_args
*a
)
132 struct iovec
*curvec
;
134 a
->iov_cnt
= RARRAY_LENINT(a
->buf
);
136 if (a
->iov_cnt
== 0) return;
137 if (a
->iov_cnt
> iov_max
) a
->iov_cnt
= iov_max
;
138 rb_str_resize(a
->vec_buf
, sizeof(struct iovec
) * a
->iov_cnt
);
139 curvec
= a
->vec
= (struct iovec
*)RSTRING_PTR(a
->vec_buf
);
141 for (i
=0; i
< a
->iov_cnt
; i
++, curvec
++) {
142 VALUE str
= rb_ary_entry(a
->buf
, i
);
143 long str_len
, next_len
;
145 if (TYPE(str
) != T_STRING
) {
146 str
= rb_obj_as_string(str
);
147 rb_ary_store(a
->buf
, i
, str
);
150 str_len
= RSTRING_LEN(str
);
152 /* lets limit total memory to write,
153 * but always take first string */
154 next_len
= a
->batch_len
+ str_len
;
155 if (i
&& next_len
> WRITEV_MEMLIMIT
) {
159 a
->batch_len
= next_len
;
161 curvec
->iov_base
= RSTRING_PTR(str
);
162 curvec
->iov_len
= str_len
;
166 static long trim_writev_buffer(struct wrv_args
*a
, ssize_t n
)
169 long ary_len
= RARRAY_LEN(a
->buf
);
171 if (n
== (ssize_t
)a
->batch_len
) {
175 for (i
= 0; n
&& i
< ary_len
; i
++) {
176 VALUE entry
= rb_ary_entry(a
->buf
, i
);
177 n
-= (ssize_t
)RSTRING_LEN(entry
);
184 assert(n
== 0 && "writev system call is broken");
189 /* partially done, remove fully-written buffers */
191 a
->buf
= rb_ary_subseq(a
->buf
, i
, ary_len
- i
);
193 /* setup+replace partially written buffer */
195 VALUE str
= rb_ary_entry(a
->buf
, 0);
196 long str_len
= RSTRING_LEN(str
);
197 str
= MY_STR_SUBSEQ(str
, str_len
+ n
, -n
);
198 rb_ary_store(a
->buf
, 0, str
);
200 return RARRAY_LEN(a
->buf
);
204 writev_check(struct wrv_args
*a
, ssize_t n
, const char *msg
, int io_wait
)
207 if (n
> 0) a
->something_written
= 1;
208 return trim_writev_buffer(a
, n
);
210 if (errno
== EINTR
) {
211 a
->fd
= my_fileno(a
->io
);
214 if (errno
== EAGAIN
) {
216 (void)kgio_call_wait_writable(a
->io
);
218 } else if (!a
->something_written
) {
219 a
->buf
= sym_wait_writable
;
223 kgio_wr_sys_fail(msg
);
228 static VALUE
my_writev(VALUE io
, VALUE ary
, int io_wait
)
233 prepare_writev(&a
, io
, ary
);
234 set_nonblocking(a
.fd
);
240 else if (a
.iov_cnt
== 1)
241 n
= write(a
.fd
, a
.vec
[0].iov_base
, a
.vec
[0].iov_len
);
242 /* for big strings use library function */
243 else if (USE_WRITEV
&&
244 ((long)(a
.batch_len
/WRITEV_IMPL_THRESHOLD
) > a
.iov_cnt
))
245 n
= writev(a
.fd
, a
.vec
, a
.iov_cnt
);
247 n
= custom_writev(a
.fd
, a
.vec
, a
.iov_cnt
, a
.batch_len
);
248 } while (writev_check(&a
, n
, "writev", io_wait
) != 0);
249 rb_str_resize(a
.vec_buf
, 0);
251 if (TYPE(a
.buf
) != T_SYMBOL
)
252 kgio_autopush_write(io
);
259 * io.kgio_writev(array) -> nil
261 * Returns nil when the write completes.
263 * This may block and call any method defined to +kgio_wait_writable+
266 * Note: it uses +Array()+ semantic for converting argument, so that
267 * it will succeed if you pass something else.
269 static VALUE
kgio_writev(VALUE io
, VALUE ary
)
271 return my_writev(io
, ary
, 1);
277 * io.kgio_trywritev(array) -> nil, Array or :wait_writable
279 * Returns nil if the write was completed in full.
281 * Returns an Array of strings containing the unwritten portion
282 * if EAGAIN was encountered, but some portion was successfully written.
284 * Returns :wait_writable if EAGAIN is encountered and nothing
287 * Note: it uses +Array()+ semantic for converting argument, so that
288 * it will succeed if you pass something else.
290 static VALUE
kgio_trywritev(VALUE io
, VALUE ary
)
292 return my_writev(io
, ary
, 0);
298 * Kgio.trywritev(io, array) -> nil, Array or :wait_writable
300 * Returns nil if the write was completed in full.
302 * Returns a Array of strings containing the unwritten portion if EAGAIN
303 * was encountered, but some portion was successfully written.
305 * Returns :wait_writable if EAGAIN is encountered and nothing
308 * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
310 static VALUE
s_trywritev(VALUE mod
, VALUE io
, VALUE ary
)
312 return kgio_trywritev(io
, ary
);
315 void init_kgio_writev(void)
318 int sys_iov_max
= IOV_MAX
;
320 int sys_iov_max
= (int)sysconf(_SC_IOV_MAX
);
323 VALUE mPipeMethods
, mSocketMethods
;
324 VALUE mKgio
= rb_define_module("Kgio");
326 if (sys_iov_max
< iov_max
)
327 iov_max
= sys_iov_max
;
329 sym_wait_writable
= ID2SYM(rb_intern("wait_writable"));
331 rb_define_singleton_method(mKgio
, "trywritev", s_trywritev
, 2);
333 mPipeMethods
= rb_define_module_under(mKgio
, "PipeMethods");
334 rb_define_method(mPipeMethods
, "kgio_writev", kgio_writev
, 1);
335 rb_define_method(mPipeMethods
, "kgio_trywritev", kgio_trywritev
, 1);
337 mSocketMethods
= rb_define_module_under(mKgio
, "SocketMethods");
338 rb_define_method(mSocketMethods
, "kgio_writev", kgio_writev
, 1);
339 rb_define_method(mSocketMethods
, "kgio_trywritev", kgio_trywritev
, 1);