Chat API update. Breaks everything.
[brdnet.git] / Chat.pas
bloba8370bd5511e5172521a4c6321b3baa44fedf72d
1 unit Chat;
3 Implement two-way realiable acked lock-step protocol
5 INTERFACE
6 uses NetAddr,ServerLoop,MemStream;
8 type tChat=object
9 remote:tNetAddr;
10 opcode:byte;
11 txSeq:Word;
12 rxSeq:Word;
13 rxAcked:boolean;
14 closed:boolean;
15 RTT:LongWord;{in ms}
16 Callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
17 OnTimeout: procedure of object;
18 OnDispose: procedure of object;
19 procedure Init(const iremote:tNetAddr);
20 procedure SetTimeout(acktm,repltm:LongInt);
21 procedure AddHeaders(var s:tMemoryStream);
22 procedure StreamInit(var s:tMemoryStream; l:word);
23 procedure Send(s:tMemoryStream);
24 {the stream can be invalidated, but the buffer must not be modified or freed}
25 procedure Ack;
26 procedure Close;
27 private
28 txPk:pointer; txLen:word; {last sent, not acked msg}
29 txTime:tDateTime;
30 tmAck,tmReply:LongWord;{ms}
31 procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
32 procedure Done;
33 procedure Resend;
34 procedure OnReply(msg:tSMsg);
35 procedure ReplyTimeout;
36 end;
38 type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
39 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
41 { download manager create FileRequest
42 File Request open chat session to server
43 upmgr accepts chat and send reply
44 FileRequest acks, chat is then closed after TimeWait
45 upmgr starts TC transfer
46 transfer finished, upmgr send new Chat to FileRequest
47 FileRequest acks, chat is closed on both ends
48 FileRequest can open new chat if blocks are missing
50 => chat msgs must be created with New, disposed by Chat
51 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
54 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
56 IMPLEMENTATION
57 uses SysUtils;
58 procedure tChat.Init(const iremote:tNetAddr);
59 begin
60 remote:=iremote;
61 opcode:=128+Random(128);
62 while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
63 InitFrom(remote,opcode);
64 end;
65 procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
66 begin
67 remote:=iremote;
68 opcode:=iopcode;
69 SetMsgHandler(opcode,remote,@OnReply);
70 txSeq:=0;
71 rxSeq:=0;
72 rxAcked:=true; {to not ack pk 0}
73 closed:=false;
74 txPk:=nil;
75 txLen:=0;
76 Callback:=nil;
77 OnTimeout:=nil;
78 OnDispose:=nil;
79 RTT:=200; {a default for timeouts}
80 txTime:=0;
81 tmAck:=0;
82 tmReply:=0;
83 end;
84 {struct
85 opcode:byte
86 seq:2
87 ack_seq:2
88 data:xx
91 procedure tCHat.AddHeaders(var s:tMemoryStream);
92 begin s.skip(5) end;
93 procedure tChat.StreamInit(var s:tMemoryStream; l:word);
94 begin
95 s.Init(GetMem(l+5),0,l+5);
96 AddHeaders(s);
97 end;
98 procedure tChat.SetTimeout(acktm,repltm:LongInt);
99 begin
100 assert(assigned(OnTimeout));
101 tmAck:=acktm;
102 tmReply:=repltm;
103 end;
105 procedure tChat.Send(s:tMemoryStream);
106 begin
107 if txLen>0 then begin
108 FreeMem(txPk,txLen);
109 UnShedule(@Resend);
110 end;
111 //assert(assigned(callback));
112 Inc(txSeq);
113 s.Seek(0);
114 s.WriteByte(opcode);
115 s.WriteWord(txSeq,2);
116 if not rxAcked then begin
117 s.WriteWord(rxSeq,2);
118 rxAcked:=true;
119 end else s.WriteWord(0,2);
120 txPk:=s.base;
121 txLen:=s.Length;
122 ServerLoop.SendMessage(txPk^,txLen,remote);
123 ServerLoop.Shedule(RTT*2,@Resend);
124 txTime:=Now;
125 end;
127 procedure tChat.Ack;
128 var s:tMemoryStream;
129 begin
130 if not rxAcked then begin
131 s.Init(GetMem(5),0,5);
132 s.WriteByte(opcode);
133 s.WriteWord(0,2);
134 s.WriteWord(rxSeq,2);
135 ServerLoop.SendMessage(s.base^,s.length,remote);
136 FreeMem(s.base,s.length);
137 rxAcked:=true;
138 end;
139 end;
141 procedure tChat.Close;
142 begin
143 assert(not closed);
144 Ack;
145 closed:=true;
146 callback:=nil; {avoid calling}
147 ontimeout:=nil;
148 //writeln('Chat: closing');
149 if txLen=0 {no packets in flight} then begin
150 Shedule(5000{todo},@Done); {wait for something lost}
151 end;
152 end;
154 procedure tChat.Done;
155 begin
156 if txLen>0 then FreeMem(txPk,txLen);
157 SetMsgHandler(opcode,remote,nil);
158 UnShedule(@Resend);
159 UnShedule(@ReplyTimeout);
160 if assigned(OnDispose) then OnDispose
161 else FreeMem(@self,sizeof(self));
162 //writeln('Chat: closed');
163 end;
165 procedure tChat.Resend;
166 {timeout waiting for ack}
167 begin
168 {check for timeout and closed}
169 if RTT<1 then RTT:=2; RTT:=RTT*2;
170 if closed and (RTT>5000) then begin
171 Done;
172 exit
173 end;
174 if (not closed) and (tmAck>0) and (RTT>tmAck) then begin
175 if assigned(ontimeout) then OnTimeout;
176 Done;
177 exit
178 end;
179 {resend}
180 //writeln('Chat: retry');
181 ServerLoop.SendMessage(txPk^,txLen,remote);
182 txTime:=Now;
183 {reshedule}
184 ServerLoop.Shedule(RTT,@Resend);
185 end;
187 procedure tChat.OnReply(msg:tSMsg);
188 var seq,aseq:Word;
189 var s:tMemoryStream;
190 begin
191 msg.stream.skip(1{opcode});
192 seq:=msg.stream.ReadWord(2);
193 aseq:=msg.stream.ReadWord(2);
194 if aseq>0 then {ack of our msg} begin
195 if (aseq=txSeq)and(txLen>0) {it is current} then begin
196 if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
197 FreeMem(txPk,txLen);
198 UnShedule(@Resend);
199 if Closed then Shedule(5,@Done);{wtf?}
200 TxLen:=0;
201 txPk:=nil;
202 if assigned(callback) then callback(msg,false);
203 if assigned(OnTimeout) and (tmReply>0) then Shedule(tmReply,@ReplyTimeout);
204 end else {write(' old-ack')it is ack of old data, do nothing};
205 end;
206 if seq>0 then {some data} begin
207 if seq<=rxSeq then {remote didnt get our ack} begin
208 s.Init(GetMem(5),0,5);
209 s.WriteByte(opcode);
210 s.WriteWord(0,2);
211 s.WriteWord(rxSeq,2);
212 ServerLoop.SendMessage(s.base^,s.length,remote);
213 FreeMem(s.base,s.length);
214 if seq=rxSeq then rxacked:=true;
215 end else begin
216 {some useful data!}
217 rxSeq:=seq;
218 rxAcked:=false;
219 UnShedule(@ReplyTimeout);
220 if assigned(callback) then callback(msg,true);
221 end;
222 end;
223 end;
225 procedure tChat.ReplyTimeout;
226 begin
227 assert(assigned(OnTimeout));
228 OnTimeout;
229 {...}
230 end;
232 var ChatHandlers: array [1..32] of tChatHandler;
234 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
235 begin
236 assert(ChatHandlers[initcode]=nil);
237 ChatHandlers[initcode]:=handler;
238 end;
240 procedure OnHiMsg(msg:tSMsg);
241 {new chat was received!}
242 var opcode:byte;
243 var seq,aseq:word;
244 var hnd:tChatHandler;
245 var nchat:^tChat;
246 var ix:byte;
247 begin
248 opcode:=msg.stream.ReadByte;
249 assert(not IsMsgHandled(opcode,msg.source^));
250 seq:=msg.stream.ReadWord(2);
251 aseq:=msg.stream.ReadWord(2);
252 if (seq<>1)or(aseq>0) then exit; {invalid initial state}
253 ix:=msg.stream.ReadByte;
254 if (ix<1)or(ix>high(ChatHandlers)) then exit;
255 hnd:=ChatHandlers[ix];
256 if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
257 msg.stream.seek(msg.stream.position-1);{unskip the initcode}
258 nchat:=GetMem(sizeof(tChat));
259 nchat^.InitFrom(msg.Source^,opcode);
260 nchat^.rxacked:=false;
261 nchat^.rxSeq:=1;
262 hnd(nchat^,msg);
263 end;
265 BEGIN
266 FillChar(ChatHandlers,sizeof(chathandlers),0);
267 ServerLoop.SetHiMsgHandler(@OnHiMsg);
268 END.