2 * we're currently too lazy to use rb_ensure to free an allocation, so we
3 * the abuse rb_str_* API for a temporary buffer
5 #define RSTRING_MODIFIED 1
15 static ssize_t
assert_writev(int fd
, void* iov
, int len
)
17 assert(0 && "you should not try to call writev");
20 # define writev assert_writev
23 #ifndef HAVE_RB_ARY_SUBSEQ
24 static inline VALUE
my_ary_subseq(VALUE ary
, long idx
, long len
)
26 VALUE args
[2] = { LONG2FIX(idx
), LONG2FIX(len
) };
28 return rb_ary_aref(2, args
, ary
);
30 #define MY_ARY_SUBSEQ(ary,idx,len) my_ary_subseq((ary),(idx),(len))
32 #define MY_ARY_SUBSEQ(ary,idx,len) rb_ary_subseq((ary),(idx),(len))
35 static VALUE sym_wait_writable
;
38 #define iovec my_iovec
45 /* tests for choosing following constants were done on Linux 3.0 x86_64
46 * (Ubuntu 12.04) Core i3 i3-2330M slowed to 1600MHz
47 * testing script https://gist.github.com/2850641
48 * fill free to make more thorough testing and choose better value
51 /* test shows that its meaningless to set WRITEV_MEMLIMIT more that 1M
52 * even when tcp_wmem set to relatively high value (2M) (in fact, it becomes
53 * even slower). 512K performs a bit better in average case. */
54 #define WRITEV_MEMLIMIT (512*1024)
55 /* same test shows that custom_writev is faster than glibc writev when
56 * average string is smaller than ~500 bytes and slower when average strings
57 * is greater then ~600 bytes. 512 bytes were choosen cause current compilers
58 * turns x/512 into x>>9 */
59 #define WRITEV_IMPL_THRESHOLD 512
61 static int iov_max
= 1024; /* this could be overriden in init */
66 VALUE vec_buf
; /* FIXME: this requires RSTRING_MODIFY for rbx */
70 int something_written
;
74 static ssize_t
custom_writev(int fd
, const struct iovec
*vec
, int iov_cnt
, size_t total_len
)
79 const struct iovec
*curvec
= vec
;
81 /* we do not want to use ruby's xmalloc because
82 * it can fire GC, and we'll free buffer shortly anyway */
83 curbuf
= buf
= malloc(total_len
);
84 if (buf
== NULL
) return -1;
86 for (i
= 0; i
< iov_cnt
; i
++, curvec
++) {
87 memcpy(curbuf
, curvec
->iov_base
, curvec
->iov_len
);
88 curbuf
+= curvec
->iov_len
;
91 result
= write(fd
, buf
, total_len
);
93 /* free() may alter errno */
101 static void prepare_writev(struct wrv_args
*a
, VALUE io
, VALUE ary
)
104 a
->fd
= my_fileno(io
);
105 a
->something_written
= 0;
107 if (TYPE(ary
) == T_ARRAY
)
108 /* rb_ary_subseq will not copy array unless it modified */
109 a
->buf
= MY_ARY_SUBSEQ(ary
, 0, RARRAY_LEN(ary
));
111 a
->buf
= rb_Array(ary
);
113 a
->vec_buf
= rb_str_new(0, 0);
117 #ifndef RARRAY_LENINT
118 static inline int rarray_int(VALUE val
)
120 long num
= RARRAY_LEN(val
);
122 if ((long)(int)num
!= num
)
123 rb_raise(rb_eRangeError
, "%ld cannot to be an int", num
);
127 #define RARRAY_LENINT(n) rarray_int(n)
130 static void fill_iovec(struct wrv_args
*a
)
133 struct iovec
*curvec
;
135 a
->iov_cnt
= RARRAY_LENINT(a
->buf
);
137 if (a
->iov_cnt
== 0) return;
138 if (a
->iov_cnt
> iov_max
) a
->iov_cnt
= iov_max
;
139 rb_str_resize(a
->vec_buf
, sizeof(struct iovec
) * a
->iov_cnt
);
140 curvec
= a
->vec
= (struct iovec
*)RSTRING_PTR(a
->vec_buf
);
142 for (i
=0; i
< a
->iov_cnt
; i
++, curvec
++) {
143 VALUE str
= rb_ary_entry(a
->buf
, i
);
144 long str_len
, next_len
;
146 if (TYPE(str
) != T_STRING
) {
147 str
= rb_obj_as_string(str
);
148 rb_ary_store(a
->buf
, i
, str
);
151 str_len
= RSTRING_LEN(str
);
153 /* lets limit total memory to write,
154 * but always take first string */
155 next_len
= a
->batch_len
+ str_len
;
156 if (i
&& next_len
> WRITEV_MEMLIMIT
) {
160 a
->batch_len
= next_len
;
162 curvec
->iov_base
= RSTRING_PTR(str
);
163 curvec
->iov_len
= str_len
;
167 static long trim_writev_buffer(struct wrv_args
*a
, ssize_t n
)
170 long ary_len
= RARRAY_LEN(a
->buf
);
172 if (n
== (ssize_t
)a
->batch_len
) {
176 for (i
= 0; n
&& i
< ary_len
; i
++) {
177 VALUE entry
= rb_ary_entry(a
->buf
, i
);
178 n
-= (ssize_t
)RSTRING_LEN(entry
);
185 assert(n
== 0 && "writev system call is broken");
190 /* partially done, remove fully-written buffers */
192 a
->buf
= MY_ARY_SUBSEQ(a
->buf
, i
, ary_len
- i
);
194 /* setup+replace partially written buffer */
196 VALUE str
= rb_ary_entry(a
->buf
, 0);
197 long str_len
= RSTRING_LEN(str
);
198 str
= MY_STR_SUBSEQ(str
, str_len
+ n
, -n
);
199 rb_ary_store(a
->buf
, 0, str
);
201 return RARRAY_LEN(a
->buf
);
205 writev_check(struct wrv_args
*a
, ssize_t n
, const char *msg
, int io_wait
)
208 if (n
> 0) a
->something_written
= 1;
209 return trim_writev_buffer(a
, n
);
211 if (errno
== EINTR
) {
212 a
->fd
= my_fileno(a
->io
);
215 if (errno
== EAGAIN
) {
217 (void)kgio_call_wait_writable(a
->io
);
219 } else if (!a
->something_written
) {
220 a
->buf
= sym_wait_writable
;
224 kgio_wr_sys_fail(msg
);
229 static VALUE
my_writev(VALUE io
, VALUE ary
, int io_wait
)
234 prepare_writev(&a
, io
, ary
);
235 set_nonblocking(a
.fd
);
241 else if (a
.iov_cnt
== 1)
242 n
= write(a
.fd
, a
.vec
[0].iov_base
, a
.vec
[0].iov_len
);
243 /* for big strings use library function */
244 else if (USE_WRITEV
&&
245 ((long)(a
.batch_len
/WRITEV_IMPL_THRESHOLD
) > a
.iov_cnt
))
246 n
= writev(a
.fd
, a
.vec
, a
.iov_cnt
);
248 n
= custom_writev(a
.fd
, a
.vec
, a
.iov_cnt
, a
.batch_len
);
249 } while (writev_check(&a
, n
, "writev", io_wait
) != 0);
250 rb_str_resize(a
.vec_buf
, 0);
258 * io.kgio_writev(array) -> nil
260 * Returns nil when the write completes.
262 * This may block and call any method defined to +kgio_wait_writable+
265 * Note: it uses +Array()+ semantic for converting argument, so that
266 * it will succeed if you pass something else.
268 static VALUE
kgio_writev(VALUE io
, VALUE ary
)
270 return my_writev(io
, ary
, 1);
276 * io.kgio_trywritev(array) -> nil, Array or :wait_writable
278 * Returns nil if the write was completed in full.
280 * Returns an Array of strings containing the unwritten portion
281 * if EAGAIN was encountered, but some portion was successfully written.
283 * Returns :wait_writable if EAGAIN is encountered and nothing
286 * Note: it uses +Array()+ semantic for converting argument, so that
287 * it will succeed if you pass something else.
289 static VALUE
kgio_trywritev(VALUE io
, VALUE ary
)
291 return my_writev(io
, ary
, 0);
297 * Kgio.trywritev(io, array) -> nil, Array or :wait_writable
299 * Returns nil if the write was completed in full.
301 * Returns a Array of strings containing the unwritten portion if EAGAIN
302 * was encountered, but some portion was successfully written.
304 * Returns :wait_writable if EAGAIN is encountered and nothing
307 * Maybe used in place of PipeMethods#kgio_trywritev for non-Kgio objects
309 static VALUE
s_trywritev(VALUE mod
, VALUE io
, VALUE ary
)
311 return kgio_trywritev(io
, ary
);
314 void init_kgio_writev(void)
317 int sys_iov_max
= IOV_MAX
;
319 int sys_iov_max
= (int)sysconf(_SC_IOV_MAX
);
322 VALUE mPipeMethods
, mSocketMethods
;
323 VALUE mKgio
= rb_define_module("Kgio");
325 if (sys_iov_max
< iov_max
)
326 iov_max
= sys_iov_max
;
328 sym_wait_writable
= ID2SYM(rb_intern("wait_writable"));
330 rb_define_singleton_method(mKgio
, "trywritev", s_trywritev
, 2);
332 mPipeMethods
= rb_define_module_under(mKgio
, "PipeMethods");
333 rb_define_method(mPipeMethods
, "kgio_writev", kgio_writev
, 1);
334 rb_define_method(mPipeMethods
, "kgio_trywritev", kgio_trywritev
, 1);
336 mSocketMethods
= rb_define_module_under(mKgio
, "SocketMethods");
337 rb_define_method(mSocketMethods
, "kgio_writev", kgio_writev
, 1);
338 rb_define_method(mSocketMethods
, "kgio_trywritev", kgio_trywritev
, 1);