Fix chat not closed after last ack.
[brdnet.git] / Chat.pas
blobf7d67dfe7742005211fdd95e09dc40d569b07f65
1 unit Chat;
3 Implement two-way realiable acked lock-step protocol
5 INTERFACE
6 uses NetAddr,ServerLoop,MemStream;
8 type tChat=object
9 remote:tNetAddr;
10 opcode:byte;
11 txSeq:Word;
12 rxSeq:Word;
13 rxAcked:boolean;
14 closed:boolean;
15 RTT:LongWord;{in ms}
16 callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
17 TMhook : procedure(willwait:LongWord ) of object;
18 DisposeHook: procedure of object; {called instead of freeing self}
19 procedure Init(const iremote:tNetAddr);
20 procedure AddHeaders(var s:tMemoryStream);
21 procedure StreamInit(var s:tMemoryStream; l:word);
22 procedure Send(s:tMemoryStream);
23 {the stream can be invalidated, but the buffer must not be modified or freed}
24 procedure Ack;
25 procedure Close;
26 private
27 txPk:pointer; txLen:word; {last sent, not acked msg}
28 txTime:tDateTime;
29 procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
30 procedure Done;
31 procedure Resend;
32 procedure OnReply(msg:tSMsg);
33 end;
35 type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
36 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
38 { download manager create FileRequest
39 File Request open chat session to server
40 upmgr accepts chat and send reply
41 FileRequest acks, chat is then closed after TimeWait
42 upmgr starts TC transfer
43 transfer finished, upmgr send new Chat to FileRequest
44 FileRequest acks, chat is closed on both ends
45 FileRequest can open new chat if blocks are missing
47 => chat msgs must be created with New, disposed by Chat
48 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
51 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
53 IMPLEMENTATION
54 uses SysUtils;
55 procedure tChat.Init(const iremote:tNetAddr);
56 begin
57 remote:=iremote;
58 opcode:=128;
59 while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
60 InitFrom(remote,opcode);
61 end;
62 procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
63 begin
64 remote:=iremote;
65 opcode:=iopcode;
66 SetMsgHandler(opcode,remote,@OnReply);
67 txSeq:=0;
68 rxSeq:=0;
69 rxAcked:=true; {to not ack pk 0}
70 closed:=false;
71 txPk:=nil;
72 txLen:=0;
73 callback:=nil;
74 TMhook:=nil;
75 DisposeHook:=nil;
76 RTT:=200; {a default for timeouts}
77 txTime:=0;
78 end;
79 {struct
80 opcode:byte
81 seq:2
82 ack_seq:2
83 data:xx
86 procedure tCHat.AddHeaders(var s:tMemoryStream);
87 begin s.skip(5) end;
88 procedure tChat.StreamInit(var s:tMemoryStream; l:word);
89 begin
90 s.Init(GetMem(l+5),0,l+5);
91 AddHeaders(s);
92 end;
94 procedure tChat.Send(s:tMemoryStream);
95 begin
96 assert(txLen=0);
97 //assert(assigned(callback));
98 Inc(txSeq);
99 s.Seek(0);
100 s.WriteByte(opcode);
101 s.WriteWord(txSeq,2);
102 if not rxAcked then begin
103 s.WriteWord(rxSeq,2);
104 rxAcked:=true;
105 end else s.WriteWord(0,2);
106 txPk:=s.base;
107 txLen:=s.Length;
108 ServerLoop.SendMessage(txPk^,txLen,remote);
109 ServerLoop.Shedule(RTT*2,@Resend);
110 txTime:=Now;
111 end;
113 procedure tChat.Ack;
114 var s:tMemoryStream;
115 begin
116 if not rxAcked then begin
117 s.Init(GetMem(5),0,5);
118 s.WriteByte(opcode);
119 s.WriteWord(0,2);
120 s.WriteWord(rxSeq,2);
121 ServerLoop.SendMessage(s.base^,s.length,remote);
122 FreeMem(s.base,s.length);
123 rxAcked:=true;
124 end;
125 end;
127 procedure tChat.Close;
128 begin
129 assert(not closed);
130 Ack;
131 closed:=true;
132 //writeln('Chat: closing');
133 if txLen=0 {no packets in flight} then begin
134 Shedule(15000{todo},@Done); {wait for something lost}
135 callback:=nil; {avoid calling}
136 tmhook:=nil;
137 end;
138 end;
140 procedure tChat.Done;
141 begin
142 {called from sheduler, Done is unsheduled, Resend is not sheduled since ack was received when Done was sheduled}
143 if txLen>0 then FreeMem(txPk,txLen);
144 SetMsgHandler(opcode,remote,nil);
145 if assigned(DisposeHook) then DisposeHook
146 else FreeMem(@self,sizeof(self));
147 //writeln('Chat: closed');
148 end;
150 procedure tChat.Resend;
151 {timeout waiting for ack}
152 begin
153 {resend and reshedule}
154 if txLen=0 then exit;
155 txTime:=0;
156 if RTT<1 then RTT:=2;
157 RTT:=RTT*2;
158 if assigned(TMhook) and (not closed) then begin
159 TMhook(RTT);
160 if closed then begin
161 Done; {if hook decided to close then abort}
162 exit;
163 end;
164 end;
165 if closed and (RTT<400) then RTT:=400;
166 if (RTT>=5000) and closed then begin
167 Done {give up}
168 end else begin
169 {finally resend the msg}
170 //writeln('Chat: retry');
171 ServerLoop.SendMessage(txPk^,txLen,remote);
172 ServerLoop.Shedule(RTT,@Resend);
173 end;
174 end;
176 procedure tChat.OnReply(msg:tSMsg);
177 var seq,aseq:Word;
178 var s:tMemoryStream;
179 begin
180 msg.stream.skip(1{opcode});
181 seq:=msg.stream.ReadWord(2);
182 aseq:=msg.stream.ReadWord(2);
183 if aseq>0 then {ack of our msg} begin
184 if (aseq=txSeq)and(txLen>0) {it is current} then begin
185 if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
186 FreeMem(txPk,txLen);
187 TxLen:=0;
188 txPk:=nil;
189 if assigned(callback) then callback(msg,false);
190 ServerLoop.UnShedule(@Resend);
191 if Closed then ServerLoop.Shedule(5000,@Done);
192 end else {write(' old-ack')it is ack of old data, do nothing};
193 end;
194 if seq>0 then {some data} begin
195 if seq<=rxSeq then {remote didnt get our ack} begin
196 s.Init(GetMem(5),0,5);
197 s.WriteByte(opcode);
198 s.WriteWord(0,2);
199 s.WriteWord(rxSeq,2);
200 ServerLoop.SendMessage(s.base^,s.length,remote);
201 FreeMem(s.base,s.length);
202 if seq=rxSeq then rxacked:=true;
203 end else begin
204 {some useful data!}
205 rxSeq:=seq;
206 rxAcked:=false;
207 if assigned(callback) then callback(msg,true);
208 end;
209 end;
210 end;
212 var ChatHandlers: array [1..32] of tChatHandler;
214 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
215 begin
216 assert(ChatHandlers[initcode]=nil);
217 ChatHandlers[initcode]:=handler;
218 end;
220 procedure OnHiMsg(msg:tSMsg);
221 {new chat was received!}
222 var opcode:byte;
223 var seq,aseq:word;
224 var hnd:tChatHandler;
225 var nchat:^tChat;
226 var ix:byte;
227 begin
228 opcode:=msg.stream.ReadByte;
229 assert(not IsMsgHandled(opcode,msg.source^));
230 seq:=msg.stream.ReadWord(2);
231 aseq:=msg.stream.ReadWord(2);
232 if (seq<>1)or(aseq>0) then exit; {invalid initial state}
233 ix:=msg.stream.ReadByte;
234 if (ix<1)or(ix>high(ChatHandlers)) then exit;
235 hnd:=ChatHandlers[ix];
236 if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
237 msg.stream.seek(msg.stream.position-1);{unskip the initcode}
238 nchat:=GetMem(sizeof(tChat));
239 nchat^.InitFrom(msg.Source^,opcode);
240 nchat^.rxacked:=false;
241 nchat^.rxSeq:=1;
242 hnd(nchat^,msg);
243 end;
245 BEGIN
246 FillChar(ChatHandlers,sizeof(chathandlers),0);
247 ServerLoop.SetHiMsgHandler(@OnHiMsg);
248 END.