cosmetix
[iv.d.git] / ncrpc.d
blobb0e85b9750e0882735cdb9c7d04c8fbff0f156e2
1 /* Written by Ketmar // Invisible Vector <ketmar@ketmar.no-ip.org>
2 * Understanding is not required. Only obedience.
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 // very simple serializer and RPC system
18 // WARNING! do not use for disk and other sensitive serialization,
19 // as format may change without notice! at least version it!
20 module iv.ncrpc /*is aliced*/;
21 private:
23 import iv.alice;
24 import iv.vfs;
25 //version(rdmd) import iv.strex;
28 // ////////////////////////////////////////////////////////////////////////// //
29 public enum NCIgnore; // ignore this field
30 public struct NCName { string name; } // rename this field
33 enum NCEntryType : ubyte {
34 End = 0x00, // WARNING! SHOULD BE ZERO!
35 Bool = 0x10,
36 Char = 0x20,
37 Int = 0x30,
38 Uint = 0x40,
39 Float = 0x50,
40 Struct = 0x60,
41 Array = 0x70,
42 Dict = 0x80,
46 // ////////////////////////////////////////////////////////////////////////// //
47 // skip serialized data block
48 public void ncskip(ST) (auto ref ST st) if (isReadableStream!ST) {
49 void skip (uint count) {
50 if (count == 0) return;
52 static if (isSeekableStream!ST) {
53 st.seek(count, Seek.Cur);
54 } else*/ {
55 ubyte[64] buf = void;
56 while (count > 0) {
57 int rd = (count > buf.length ? cast(int)buf.length : cast(int)count);
58 st.rawReadExact(buf[0..rd]);
59 count -= rd;
64 void skipStr () {
65 ubyte len = st.readNum!ubyte;
66 skip(len);
69 void skipType (ubyte tp, int count=1) {
70 switch (tp&0xf0) {
71 case NCEntryType.Bool:
72 case NCEntryType.Char:
73 case NCEntryType.Int:
74 case NCEntryType.Uint:
75 case NCEntryType.Float:
76 skip(count*(tp&0x0f)); // size
77 break;
78 case NCEntryType.Struct:
79 if ((tp&0x0f) != 0) throw new Exception("invalid struct type");
80 while (count-- > 0) {
81 skipStr(); // struct name
82 // fields
83 for (;;) {
84 tp = st.readNum!ubyte; // name length
85 if (tp == NCEntryType.End) break;
86 skip(tp); // name
87 tp = st.readNum!ubyte; // data type
88 skipType(tp);
91 break;
92 case NCEntryType.Array:
93 if ((tp&0x0f) != 0) throw new Exception("invalid array type");
94 while (count-- > 0) {
95 ubyte dimc = st.readNum!ubyte; // dimension count
96 if (dimc == 0) throw new Exception("invalid array type");
97 tp = st.readNum!ubyte; // data type
99 void readDim (int dcleft) {
100 auto len = st.readXInt!uint;
101 if (dcleft == 1) {
102 //foreach (immutable _; 0..len) skipType(tp);
103 if (len <= int.max) {
104 skipType(tp, len);
105 } else {
106 foreach (immutable _; 0..len) skipType(tp);
108 } else {
109 foreach (immutable _; 0..len) readDim(dcleft-1);
112 readDim(dimc);
114 break;
115 case NCEntryType.Dict:
116 if ((tp&0x0f) != 0) throw new Exception("invalid dict type");
117 while (count-- > 0) {
118 ubyte kt = st.readNum!ubyte; // key type
119 ubyte vt = st.readNum!ubyte; // value type
120 foreach (immutable _; 0..st.readXInt!usize) {
121 skipType(kt);
122 skipType(vt);
125 break;
126 default: throw new Exception("invalid data type");
130 skipType(st.readNum!ubyte);
134 // ////////////////////////////////////////////////////////////////////////// //
135 // read serialized data block to buffer, return wrapped memory
136 public ubyte[] ncreadBytes(ST) (auto ref ST st) if (isReadableStream!ST) {
137 ubyte[] data;
139 struct CopyStream {
140 void[] rawRead (void[] buf) {
141 auto rd = st.rawRead(buf);
142 if (rd.length) data ~= cast(const(ubyte)[])rd;
143 return rd;
146 wrapStream(CopyStream()).ncskip;
147 return data;
151 // read serialized data block to buffer, return wrapped memory
152 public VFile ncread(ST) (auto ref ST st) if (isReadableStream!ST) {
153 return st.ncreadBytes.wrapMemoryRO;
157 // ////////////////////////////////////////////////////////////////////////// //
158 template isSimpleType(T) {
159 private import std.traits : Unqual;
160 private alias UT = Unqual!T;
161 enum isSimpleType = __traits(isIntegral, UT) || __traits(isFloating, UT) || is(UT == bool);
165 void ncWriteUbyte(ST) (auto ref ST fl, ubyte b) {
166 fl.rawWriteExact((&b)[0..1]);
170 ubyte ncReadUbyte(ST) (auto ref ST fl) {
171 ubyte b;
172 fl.rawReadExact((&b)[0..1]);
173 return b;
177 // ////////////////////////////////////////////////////////////////////////// //
178 public void ncser(T, ST) (auto ref ST fl, in auto ref T v) if (!is(T == class) && isWriteableStream!ST) {
179 import std.traits : Unqual;
181 void writeTypeHeader(T) () {
182 alias UT = Unqual!T;
183 static if (is(UT : V[], V)) {
184 enum dc = dimensionCount!UT;
185 static assert(dc <= 255, "too many array dimenstions");
186 fl.ncWriteUbyte(NCEntryType.Array);
187 fl.ncWriteUbyte(cast(ubyte)dc);
188 writeTypeHeader!(arrayElementType!UT);
189 } else static if (is(UT : K[V], K, V)) {
190 fl.ncWriteUbyte(NCEntryType.Dict);
191 writeTypeHeader!(Unqual!K);
192 writeTypeHeader!(Unqual!V);
193 } else static if (is(UT == bool)) {
194 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Bool|bool.sizeof));
195 } else static if (is(UT == char) || is(UT == wchar) || is(UT == dchar)) {
196 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Char|UT.sizeof));
197 } else static if (__traits(isIntegral, UT)) {
198 static if (__traits(isUnsigned, UT)) {
199 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Uint|UT.sizeof));
200 } else {
201 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Int|UT.sizeof));
203 } else static if (__traits(isFloating, UT)) {
204 fl.ncWriteUbyte(cast(ubyte)(NCEntryType.Float|UT.sizeof));
205 } else static if (is(UT == struct)) {
206 static assert(UT.stringof.length <= 255, "struct name too long: "~UT.stringof);
207 fl.ncWriteUbyte(NCEntryType.Struct);
208 fl.ncWriteUbyte(cast(ubyte)UT.stringof.length);
209 fl.rawWriteExact(UT.stringof[]);
210 } else {
211 static assert(0, "can't serialize type '"~T.stringof~"'");
215 void serData(T) (in ref T v) {
216 alias UT = arrayElementType!T;
217 static if (is(T : V[], V)) {
218 // array
219 void writeMArray(AT) (AT arr) {
220 fl.writeXInt(arr.length);
221 static if (isMultiDimArray!AT) {
222 foreach (const a2; arr) writeMArray(a2);
223 } else {
224 // write POD arrays in one chunk
225 static if (isSimpleType!UT) {
226 fl.rawWriteExact(arr[]);
227 } else {
228 foreach (const ref it; arr) serData(it);
232 writeMArray(v);
233 } else static if (is(T : V[K], K, V)) {
234 // associative array
235 fl.writeXInt(v.length);
236 foreach (const kv; v.byKeyValue) {
237 serData(kv.key);
238 serData(kv.value);
240 } else static if (isSimpleType!UT) {
241 fl.rawWriteExact((&v)[0..1]);
242 } else static if (is(UT == struct)) {
243 import std.traits : FieldNameTuple, getUDAs, hasUDA;
244 foreach (string fldname; FieldNameTuple!UT) {
245 static if (!hasUDA!(__traits(getMember, UT, fldname), NCIgnore)) {
246 enum names = getUDAs!(__traits(getMember, UT, fldname), NCName);
247 static if (names.length) enum xname = names[0].name; else enum xname = fldname;
248 static assert(xname.length <= 255, "struct '"~UT.stringof~"': field name too long: "~xname);
249 fl.ncWriteUbyte(cast(ubyte)xname.length);
250 fl.rawWriteExact(xname[]);
251 fl.ncser(__traits(getMember, v, fldname));
254 fl.ncWriteUbyte(NCEntryType.End);
255 } else {
256 static assert(0, "can't serialize type '"~T.stringof~"'");
260 writeTypeHeader!T;
261 serData(v);
265 // ////////////////////////////////////////////////////////////////////////// //
266 public void ncunser(T, ST) (auto ref ST fl, out T v) if (!is(T == class) && isReadableStream!ST) {
267 import std.traits : Unqual;
269 void checkTypeId(T) () {
270 static if (is(T : V[], V)) {
271 if (fl.ncReadUbyte != NCEntryType.Array) throw new Exception(`invalid stream (array expected)`);
272 if (fl.ncReadUbyte != dimensionCount!T) throw new Exception(`invalid stream (dimension count)`);
273 checkTypeId!(arrayElementType!T);
274 } else static if (is(T : K[V], K, V)) {
275 if (fl.ncReadUbyte != NCEntryType.Dict) throw new Exception(`invalid stream (dict expected)`);
276 checkTypeId!(Unqual!K);
277 checkTypeId!(Unqual!V);
278 } else static if (is(T == bool)) {
279 if (fl.ncReadUbyte != (NCEntryType.Bool|bool.sizeof)) throw new Exception(`invalid stream (bool expected)`);
280 } else static if (is(T == char) || is(T == wchar) || is(T == dchar)) {
281 if (fl.ncReadUbyte != (NCEntryType.Char|T.sizeof)) throw new Exception(`invalid stream (char expected)`);
282 } else static if (__traits(isIntegral, T)) {
283 static if (__traits(isUnsigned, T)) {
284 if (fl.ncReadUbyte != (NCEntryType.Uint|T.sizeof)) throw new Exception(`invalid stream (int expected)`);
285 } else {
286 if (fl.ncReadUbyte != (NCEntryType.Int|T.sizeof)) throw new Exception(`invalid stream (int expected)`);
288 } else static if (__traits(isFloating, T)) {
289 if (fl.ncReadUbyte != (NCEntryType.Float|T.sizeof)) throw new Exception(`invalid stream (float expected)`);
290 } else static if (is(T == struct)) {
291 char[255] cbuf = void;
292 static assert(T.stringof.length <= 255, "struct name too long: "~T.stringof);
293 if (fl.ncReadUbyte != NCEntryType.Struct) throw new Exception(`invalid stream (struct expected)`);
294 if (fl.ncReadUbyte != T.stringof.length) throw new Exception(`invalid stream (struct name length)`);
295 fl.rawReadExact(cbuf[0..T.stringof.length]);
296 if (cbuf[0..T.stringof.length] != T.stringof) throw new Exception(`invalid stream (struct name)`);
297 } else {
298 static assert(0, "can't unserialize type '"~T.stringof~"'");
302 void unserData(T) (out T v) {
303 static if (is(T : V[], V)) {
304 void readMArray(AT) (out AT arr) {
305 auto llen = fl.readXInt!usize;
306 if (llen == 0) return;
307 static if (__traits(isStaticArray, AT)) {
308 if (arr.length != llen) throw new Exception(`invalid stream (array size)`);
309 alias narr = arr;
310 } else {
311 Unqual!(typeof(arr[0]))[] narr;
312 narr.length = llen;
314 static if (isMultiDimArray!AT) {
315 foreach (ref a2; narr) readMArray(a2);
316 } else {
317 alias ET = arrayElementType!AT;
318 // read byte arrays in one chunk
319 static if (isSimpleType!ET) {
320 fl.rawReadExact(narr[]);
321 } else {
322 foreach (ref it; narr) unserData(it);
325 static if (!__traits(isStaticArray, AT)) arr = cast(AT)narr;
327 readMArray(v);
328 } else static if (is(T : V[K], K, V)) {
329 K key = void;
330 V value = void;
331 foreach (immutable _; 0..fl.readXInt!usize) {
332 unserData(key);
333 unserData(value);
334 v[key] = value;
336 } else static if (isSimpleType!T) {
337 fl.rawReadExact((&v)[0..1]);
338 } else static if (is(T == struct)) {
339 import std.traits : FieldNameTuple, getUDAs, hasUDA;
341 ulong[(FieldNameTuple!T.length+ulong.sizeof-1)/ulong.sizeof] fldseen = 0;
343 bool tryField(uint idx, string fldname) (const(char)[] name) {
344 static if (hasUDA!(__traits(getMember, T, fldname), NCName)) {
345 enum names = getUDAs!(__traits(getMember, T, fldname), NCName);
346 } else {
347 alias tuple(T...) = T;
348 enum names = tuple!(NCName(fldname));
350 foreach (immutable xname; names) {
351 if (xname.name == name) {
352 if (fldseen[idx/8]&(1UL<<(idx%8))) throw new Exception(`duplicate field value for '`~fldname~`'`);
353 fldseen[idx/8] |= 1UL<<(idx%8);
354 fl.ncunser(__traits(getMember, v, fldname));
355 return true;
358 return false;
361 void tryAllFields (const(char)[] name) {
362 foreach (immutable idx, string fldname; FieldNameTuple!T) {
363 static if (!hasUDA!(__traits(getMember, T, fldname), NCIgnore)) {
364 if (tryField!(idx, fldname)(name)) return;
367 throw new Exception("unknown field '"~name.idup~"'");
370 char[255] cbuf = void;
371 // let's hope that fields are in order
372 foreach (immutable idx, string fldname; FieldNameTuple!T) {
373 static if (!hasUDA!(__traits(getMember, T, fldname), NCIgnore)) {
374 auto nlen = fl.ncReadUbyte;
375 if (nlen == NCEntryType.End) throw new Exception("invalid stream (out of fields)");
376 fl.rawReadExact(cbuf[0..nlen]);
377 if (!tryField!(idx, fldname)(cbuf[0..nlen])) tryAllFields(cbuf[0..nlen]);
380 if (fl.ncReadUbyte != NCEntryType.End) throw new Exception("invalid stream (extra fields)");
384 checkTypeId!T;
385 unserData(v);
389 // ////////////////////////////////////////////////////////////////////////// //
390 template isMultiDimArray(T) {
391 private import std.range.primitives : hasLength;
392 private import std.traits : isArray, isNarrowString;
393 static if (isArray!T) {
394 alias DT = typeof(T.init[0]);
395 static if (hasLength!DT || isNarrowString!DT) {
396 enum isMultiDimArray = true;
397 } else {
398 enum isMultiDimArray = false;
400 } else {
401 enum isMultiDimArray = false;
404 static assert(isMultiDimArray!(string[]) == true);
405 static assert(isMultiDimArray!string == false);
406 static assert(isMultiDimArray!(int[int]) == false);
409 template dimensionCount(T) {
410 private import std.range.primitives : hasLength;
411 private import std.traits : isArray, isNarrowString;
412 static if (isArray!T) {
413 alias DT = typeof(T.init[0]);
414 static if (hasLength!DT || isNarrowString!DT) {
415 enum dimensionCount = 1+dimensionCount!DT;
416 } else {
417 enum dimensionCount = 1;
419 } else {
420 enum dimensionCount = 0;
423 static assert(dimensionCount!string == 1);
424 static assert(dimensionCount!(int[int]) == 0);
427 template arrayElementType(T) {
428 private import std.traits : isArray, Unqual;
429 static if (isArray!T) {
430 alias arrayElementType = arrayElementType!(typeof(T.init[0]));
431 } else static if (is(typeof(T))) {
432 alias arrayElementType = Unqual!(typeof(T));
433 } else {
434 alias arrayElementType = Unqual!T;
437 static assert(is(arrayElementType!string == char));
440 // ////////////////////////////////////////////////////////////////////////// //
441 version(ncserial_test) unittest {
442 import iv.vfs;
444 // ////////////////////////////////////////////////////////////////////////// //
445 static struct AssemblyInfo {
446 uint id;
447 string name;
448 @NCIgnore uint ignoreme;
451 static struct ReplyAsmInfo {
452 @NCName("command") @NCName("xcommand") ubyte cmd;
453 @NCName("values") AssemblyInfo[][2] list;
454 uint[string] dict;
455 bool fbool;
456 char[3] ext;
460 // ////////////////////////////////////////////////////////////////////////// //
461 void test0 () {
462 ReplyAsmInfo ri;
463 ri.cmd = 42;
464 ri.list[0] ~= AssemblyInfo(666, "hell");
465 ri.list[1] ~= AssemblyInfo(69, "fuck");
466 ri.dict["foo"] = 42;
467 ri.dict["boo"] = 666;
468 ri.fbool = true;
469 ri.ext = "elf";
471 auto fl = VFile("z00.bin", "w");
472 fl.ncser(ri);
475 ReplyAsmInfo xf;
476 auto fl = VFile("z00.bin");
477 fl.ncunser(xf);
478 assert(fl.tell == fl.size);
479 assert(xf.cmd == 42);
480 assert(xf.list.length == 2);
481 assert(xf.list[0].length == 1);
482 assert(xf.list[1].length == 1);
483 assert(xf.list[0][0].id == 666);
484 assert(xf.list[0][0].name == "hell");
485 assert(xf.list[1][0].id == 69);
486 assert(xf.list[1][0].name == "fuck");
487 assert(xf.dict.length == 2);
488 assert(xf.dict["foo"] == 42);
489 assert(xf.dict["boo"] == 666);
490 assert(xf.fbool == true);
491 assert(xf.ext == "elf");
495 void test1 () {
496 ReplyAsmInfo ri;
497 ri.cmd = 42;
498 ri.list[0] ~= AssemblyInfo(666, "hell");
499 ri.list[1] ~= AssemblyInfo(69, "fuck");
500 ri.dict["foo"] = 42;
501 ri.dict["boo"] = 666;
502 ri.fbool = true;
503 ri.ext = "elf";
504 auto mem = wrapMemoryRW(null);
505 mem.ncser(ri);
507 mem.seek(0);
508 ReplyAsmInfo xf;
509 mem.ncunser(xf);
510 assert(mem.tell == mem.size);
511 assert(xf.cmd == 42);
512 assert(xf.list.length == 2);
513 assert(xf.list[0].length == 1);
514 assert(xf.list[1].length == 1);
515 assert(xf.list[0][0].id == 666);
516 assert(xf.list[0][0].name == "hell");
517 assert(xf.list[1][0].id == 69);
518 assert(xf.list[1][0].name == "fuck");
519 assert(xf.dict.length == 2);
520 assert(xf.dict["foo"] == 42);
521 assert(xf.dict["boo"] == 666);
522 assert(xf.fbool == true);
523 assert(xf.ext == "elf");
525 mem.seek(0);
526 mem.ncskip;
527 assert(mem.tell == mem.size);
529 auto m2 = mem.ncread;
530 assert(mem.tell == mem.size);
531 assert(m2.tell == mem.size);
536 // ////////////////////////////////////////////////////////////////////////// //
537 // simple RPC system
538 private import std.traits;
540 public enum RPCommand : ushort {
541 Call = 0x29a,
542 RetVoid,
543 RetRes,
544 Err,
548 // ////////////////////////////////////////////////////////////////////////// //
549 private alias Id(alias T) = T;
551 private struct RPCEndPoint {
552 string name;
553 ubyte[32] hash;
554 bool isFunction;
555 VFile delegate (VFile fi) dg; // read args, do call, write result; throws on error; returns serialized res
559 private RPCEndPoint[string] endpoints;
562 // ////////////////////////////////////////////////////////////////////////// //
563 private string nodots (string s) {
564 if (s.length > 2 && s[0] == '"' && s[$-1] == '"') s = s[1..$-1];
565 usize pos = s.length;
566 while (pos > 0 && s[pos-1] != '.') --pos;
567 return s[pos..$];
571 // ////////////////////////////////////////////////////////////////////////// //
572 public string[] rpcEndpointNames () { return endpoints.keys; }
575 // ////////////////////////////////////////////////////////////////////////// //
576 public static ubyte[32] rpchash(alias func) () if (isCallable!func) {
577 import std.digest.sha;
578 SHA256 sha;
580 void put (const(void)[] buf) {
581 sha.put(cast(const(ubyte)[])buf);
584 sha.start();
585 //put(nodots(fullyQualifiedName!func.stringof));
586 put(ReturnType!func.stringof);
587 put(",");
588 foreach (immutable par; Parameters!func) {
589 put(par.stringof);
590 put(",");
592 return sha.finish();
596 // ////////////////////////////////////////////////////////////////////////// //
597 private string BuildCall(alias func) () {
598 string res = "func(";
599 foreach (immutable idx, immutable par; Parameters!func) {
600 import std.conv : to;
601 res ~= "mr.a";
602 res ~= idx.to!string;
603 res ~= ",";
605 return res~")";
609 // ////////////////////////////////////////////////////////////////////////// //
610 private static mixin template BuildRPCArgs (alias func) {
611 private import std.traits;
612 private static string buildIt(alias func) () {
613 string res;
614 alias defs = ParameterDefaults!func;
615 foreach (immutable idx, immutable par; Parameters!func) {
616 import std.conv : to;
617 res ~= par.stringof;
618 res ~= " a";
619 res ~= to!string(idx);
620 static if (!is(defs[idx] == void)) res ~= " = "~defs[idx].stringof;
621 res ~= ";\n";
623 return res;
625 mixin(buildIt!func);
629 // ////////////////////////////////////////////////////////////////////////// //
630 public struct RPCCallHeader {
631 string name; // fqn
632 ubyte[32] hash;
636 // ////////////////////////////////////////////////////////////////////////// //
637 private void fcopy (VFile to, VFile from) {
638 ubyte[64] buf = void;
639 for (;;) {
640 auto rd = from.rawRead(buf[]);
641 if (rd.length == 0) break;
642 to.rawWriteExact(rd[]);
647 // ////////////////////////////////////////////////////////////////////////// //
648 // client will use this
649 // it will send RPCommand.Call
650 // throws on fatal stream error
651 public static auto rpcall(alias func, string prefix=null, string name=null, ST, A...) (auto ref ST chan, A args)
652 if (isRWStream!ST && (is(typeof(func) == function) || is(typeof(func) == delegate)))
654 //pragma(msg, "type: ", typeof(func));
655 //pragma(msg, "prot: ", __traits(getProtection, func));
656 import std.traits;
657 static struct RPCMarshalArgs { mixin BuildRPCArgs!func; }
658 RPCMarshalArgs mr;
659 alias defs = ParameterDefaults!func;
660 static assert(A.length <= defs.length, "too many arguments");
661 static if (A.length < defs.length) static assert(!is(defs[A.length] == void), "not enough default argument values");
662 foreach (immutable idx, ref arg; args) {
663 import std.conv : to;
664 mixin("mr.a"~to!string(idx)~" = arg;");
666 RPCCallHeader hdr;
667 static if (name.length > 0) {
668 hdr.name = prefix~name;
669 } else {
670 hdr.name = prefix~nodots(fullyQualifiedName!func.stringof);
672 hdr.hash = rpchash!func;
673 // call
674 chan.writeNum!ushort(RPCommand.Call);
675 chan.ncser(hdr);
676 chan.ncser(mr);
677 // result
678 auto replyCode = chan.readNum!ushort;
679 if (replyCode == RPCommand.Err) {
680 string msg;
681 chan.ncunser(msg);
682 throw new Exception("RPC ERROR: "~msg);
684 if (replyCode == RPCommand.RetRes) {
685 static if (!is(ReturnType!func == void)) {
686 // read result
687 ReturnType!func rval;
688 chan.ncunser(rval);
689 return rval;
690 } else {
691 chan.ncskip;
692 return;
694 } else if (replyCode == RPCommand.RetVoid) {
695 // got reply, wow
696 static if (!is(ReturnType!func == void)) {
697 return ReturnType!func.init;
698 } else {
699 return;
702 throw new Exception("invalid RPC reply");
706 // ////////////////////////////////////////////////////////////////////////// //
707 // client will use this
708 // it will send RPCommand.Call
709 // throws on fatal stream error
710 public static RT rpcallany(RT, ST, A...) (auto ref ST chan, const(char)[] name, A args) if (isRWStream!ST) {
711 import std.traits;
712 string BuildIt () {
713 string res;
714 foreach (immutable idx, const tp; A) {
715 import std.conv : to;
716 res ~= tp.stringof;
717 res ~= " a";
718 res ~= to!string(idx);
719 res ~= ";\n";
721 return res;
723 static struct RPCMarshalArgs { mixin(BuildIt); /*pragma(msg, BuildIt);*/ }
724 RPCMarshalArgs mr;
725 foreach (immutable idx, ref arg; args) {
726 import std.conv : to;
727 mixin("mr.a"~to!string(idx)~" = arg;");
729 RPCCallHeader hdr;
730 hdr.name = cast(string)name; // it is safe to cast it here
731 hdr.hash[] = 0;
732 // call
733 chan.writeNum!ushort(RPCommand.Call);
734 chan.ncser(hdr);
735 chan.ncser(mr);
736 // result
737 auto replyCode = chan.readNum!ushort;
738 if (replyCode == RPCommand.Err) {
739 string msg;
740 chan.ncunser(msg);
741 throw new Exception("RPC ERROR: "~msg);
743 if (replyCode == RPCommand.RetRes) {
744 static if (!is(RT == void)) {
745 // read result
746 RT rval;
747 chan.ncunser(rval);
748 return rval;
749 } else {
750 chan.ncskip;
751 return;
753 } else if (replyCode == RPCommand.RetVoid) {
754 // got reply, wow
755 static if (!is(RT == void)) {
756 return RT.init;
757 } else {
758 return;
761 throw new Exception("invalid RPC reply");
765 // ////////////////////////////////////////////////////////////////////////// //
766 // register RPC endpoint (server-side)
767 // if you'll specify only prefix, it will be added to func name
768 public static void rpcRegisterEndpoint(alias Dg) (typeof(Dg) func, const(char)[] prefix=null, const(char)[] name=null)
769 if (is(typeof(Dg) == function) || is(typeof(Dg) == delegate))
771 import std.traits : FieldNameTuple, getUDAs, hasUDA;
772 import std.digest.sha;
773 RPCEndPoint ep;
775 if (name.length) {
776 ep.name = prefix.idup~name.idup;
777 } else {
778 ep.name = prefix.idup~nodots(fullyQualifiedName!Dg.stringof);
779 //{ import std.stdio; stderr.writeln("name: [", ep.name, "]"); }
781 ep.hash = rpchash!Dg;
782 ep.dg = delegate (VFile fi) {
783 // parse and call
784 static struct RPCMarshalArgs { mixin BuildRPCArgs!Dg; }
785 RPCMarshalArgs mr;
786 fi.ncunser(mr);
787 auto fo = wrapMemoryRW(null);
788 static if (is(ReturnType!Dg == void)) {
789 mixin(BuildCall!Dg~";");
790 } else {
791 mixin("fo.ncser("~BuildCall!Dg~");");
793 fo.seek(0);
794 return fo;
796 endpoints[ep.name] = ep;
800 // ////////////////////////////////////////////////////////////////////////// //
801 // server will use this; RPCommand.Call already read
802 // throws on unrecoverable stream error
803 public static bool rpcProcessCall(ST) (auto ref ST chan) if (isRWStream!ST) {
804 RPCCallHeader hdr;
805 chan.ncunser(hdr);
806 auto epp = hdr.name in endpoints;
807 if (epp is null) {
808 chan.ncskip;
809 chan.writeNum!ushort(RPCommand.Err);
810 chan.ncser("unknown function '"~hdr.name~"'");
811 return false;
813 foreach (ubyte b; hdr.hash) {
814 if (b != 0) {
815 if (epp.hash != hdr.hash) {
816 chan.ncskip;
817 chan.writeNum!ushort(RPCommand.Err);
818 chan.ncser("invalid signature for function '"~hdr.name~"'");
819 return false;
821 break;
824 auto rdf = chan.ncread;
825 VFile rf;
826 try {
827 rf = epp.dg(rdf);
828 } catch (Exception e) {
829 chan.writeNum!ushort(RPCommand.Err);
830 chan.ncser("EXCEPTION: "~e.msg);
831 return false;
833 if (rf.size > 0) {
834 chan.writeNum!ushort(RPCommand.RetRes);
835 ubyte[512] buf = void;
836 for (;;) {
837 auto rd = rf.rawRead(buf[]);
838 if (rd.length == 0) break;
839 chan.rawWriteExact(rd[]);
841 } else {
842 chan.writeNum!ushort(RPCommand.RetVoid);
844 return true;
848 // ////////////////////////////////////////////////////////////////////////// //
849 /** unix domain socket without inode
850 * for server:
852 * ---------
853 * UDSocket sk;
854 * sk.create("/k8/rpc-test");
855 * auto cl = sk.accept();
856 * ---------
859 * for client:
861 * ---------
862 * UDSocket sk;
863 * sk.connect("/k8/rpc-test");
864 * ---------
866 public struct UDSocket {
867 private:
868 static struct UDSData {
869 uint rc;
870 int fd;
871 uint bytesSent;
872 uint bytesReceived;
873 bool didlisten;
874 bool dontclose;
875 @disable this (this);
878 private:
879 usize udsp;
881 void decRef () nothrow @nogc {
882 if (!udsp) return;
883 auto uds = cast(UDSData*)udsp;
884 if (--uds.rc == 0) {
885 import core.stdc.stdlib : free;
886 import core.sys.posix.unistd : close;
887 if (!uds.dontclose) close(uds.fd);
888 free(uds);
890 udsp = 0;
893 public:
894 this (this) nothrow @nogc { pragma(inline, true); if (udsp) ++(cast(UDSData*)udsp).rc; } ///
895 ~this () nothrow @nogc { pragma(inline, true); if (udsp) close(); } ///
898 void opAssign (UDSocket sk) {
899 pragma(inline, true);
900 if (sk.udsp) ++(cast(UDSData*)sk.udsp).rc;
901 close();
902 udsp = sk.udsp;
905 @property bool isOpen () const nothrow @trusted @nogc { pragma(inline, true); return (udsp != 0); } ///
906 @property int fd () const nothrow @trusted @nogc { pragma(inline, true); return (udsp != 0 ? (cast(UDSData*)udsp).fd : -1); } ///
908 void close () nothrow @nogc { pragma(inline, true); if (udsp) decRef(); } ///
909 void create (const(char)[] name) { doCC!"server"(name); } ///
910 void connect (const(char)[] name) { doCC!"client"(name); } ///
912 @property uint bytesSent () const nothrow @trusted @nogc { pragma(inline, true); return (udsp != 0 ? (cast(UDSData*)udsp).bytesSent : 0); } ///
913 @property uint bytesReceived () const nothrow @trusted @nogc { pragma(inline, true); return (udsp != 0 ? (cast(UDSData*)udsp).bytesReceived : 0); } ///
915 @property void resetBytesSent () nothrow @trusted @nogc { pragma(inline, true); if (udsp != 0) (cast(UDSData*)udsp).bytesSent = 0; } ///
916 @property void resetBytesReceived () nothrow @trusted @nogc { pragma(inline, true); if (udsp != 0) (cast(UDSData*)udsp).bytesReceived = 0; } ///
919 void listen () {
920 if (!udsp) throw new Exception("can't listen on closed socket");
921 auto uds = cast(UDSData*)udsp;
922 if (!uds.didlisten) {
923 import core.sys.posix.sys.socket : listen;
924 if (listen(uds.fd, 1) != 0) throw new Exception("listen failed");
925 uds.didlisten = true;
930 UDSocket accept () {
931 listen();
932 auto uds = cast(UDSData*)udsp;
933 assert(uds.didlisten);
934 import core.sys.posix.sys.socket : accept;
935 int cfd = accept(uds.fd, null, null);
936 if (cfd == -1) throw new Exception("accept failed");
937 UDSocket res;
938 res.assignFD(cfd);
939 return res;
942 /// detach fd
943 int detach () {
944 if (!udsp) throw new Exception("can't detach closed socket");
945 auto uds = cast(UDSData*)udsp;
946 int rfd = uds.fd;
947 uds.dontclose = true;
948 close();
949 return rfd;
953 void[] rawRead (void[] buf) {
954 import core.sys.posix.sys.socket : recv;
955 if (!udsp) throw new Exception("can't read from closed socket");
956 auto uds = cast(UDSData*)udsp;
957 if (buf.length == 0) return buf[];
958 auto rd = recv(uds.fd, buf.ptr, buf.length, 0);
959 if (rd < 0) throw new Exception("socket read error");
960 uds.bytesReceived += rd;
961 return buf[0..rd];
965 void rawWrite (const(void)[] buf) {
966 import core.sys.posix.sys.socket : send, MSG_NOSIGNAL;
967 if (!udsp) throw new Exception("can't write to closed socket");
968 auto uds = cast(UDSData*)udsp;
969 auto dp = cast(const(ubyte)*)buf.ptr;
970 auto left = buf.length;
971 while (left > 0) {
972 auto wr = send(uds.fd, dp, left, 0);
973 if (wr <= 0) throw new Exception("socket write error");
974 uds.bytesSent += wr;
975 dp += wr;
976 left -= wr;
980 private:
981 void assignFD (int fd) {
982 import core.stdc.stdlib : malloc;
983 import core.stdc.string : memset;
984 close();
985 if (fd >= 0) {
986 auto uds = cast(UDSData*)malloc(UDSData.sizeof);
987 if (uds is null) {
988 import core.sys.posix.unistd : close;
989 close(fd);
990 throw new Exception("out of memory"); // let's hope that we can do it
992 memset(uds, 0, (*uds).sizeof);
993 uds.rc = 1;
994 uds.fd = fd;
995 udsp = cast(usize)uds;
999 void doCC(string mode) (const(char)[] name) {
1000 static assert(mode == "client" || mode == "server", "invalid mode");
1001 import core.stdc.stdlib : malloc;
1002 import core.stdc.string : memset;
1003 close();
1004 int fd = makeUADS!mode(name);
1005 auto uds = cast(UDSData*)malloc(UDSData.sizeof);
1006 if (uds is null) {
1007 import core.sys.posix.unistd : close;
1008 close(fd);
1009 throw new Exception("out of memory"); // let's hope that we can do it
1011 memset(uds, 0, (*uds).sizeof);
1012 uds.rc = 1;
1013 uds.fd = fd;
1014 udsp = cast(usize)uds;
1017 static int makeUADS(string mode) (const(char)[] name) {
1018 static assert(mode == "client" || mode == "server", "invalid mode");
1019 import core.stdc.string : memset;
1020 import core.sys.posix.sys.socket;
1021 import core.sys.posix.sys.un : sockaddr_un;
1022 import core.sys.posix.unistd : close;
1023 // max name length is 108, so be safe here
1024 if (name.length == 0 || name.length > 100) throw new Exception("invalid name");
1025 //{ import core.stdc.stdio; printf("[%.*s]\n", cast(uint)name.length, name.ptr); }
1026 sockaddr_un sun = void;
1027 memset(&sun, 0, sun.sizeof);
1028 sun.sun_family = AF_UNIX;
1029 // create domain socket without FS inode (first byte of name buffer should be zero)
1030 sun.sun_path[1..1+name.length] = cast(byte[])name[];
1031 int fd = socket(AF_UNIX, SOCK_STREAM, 0);
1032 if (fd < 0) throw new Exception("can't create unix domain socket");
1033 static if (mode == "server") {
1034 import core.sys.posix.sys.socket : bind;
1035 if (bind(fd, cast(sockaddr*)&sun, sun.sizeof) != 0) { close(fd); throw new Exception("can't bind unix domain socket"); }
1036 } else {
1037 import core.sys.posix.sys.socket : connect;
1038 if (connect(fd, cast(sockaddr*)&sun, sun.sizeof) != 0) {
1039 import core.stdc.errno;
1040 auto err = errno;
1041 close(fd);
1042 //{ import std.stdio; writeln("ERRNO: ", err); }
1043 throw new Exception("can't connect to unix domain socket");
1046 //{ import core.stdc.stdio; printf("fd=%d\n", fd); }
1047 return fd;