Fix Chat and PT in ServerLoop so it works.
[brdnet.git] / Chat.pas
blob96f6420963c362d71498cbfc404f942c50b14b5e
1 unit Chat;
3 Implement two-way realiable acked lock-step protocol
5 INTERFACE
6 uses NetAddr,ServerLoop,MemStream;
8 type tChat=object
9 remote:tNetAddr;
10 opcode:byte;
11 txSeq:Word;
12 rxSeq:Word;
13 rxAcked:boolean;
14 closed:boolean;
15 RTT:LongWord;{in ms}
16 callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
17 TMhook : procedure(willwait:LongWord ) of object;
18 DisposeHook: procedure of object; {called instead of freeing self}
19 procedure Init(const iremote:tNetAddr);
20 procedure AddHeaders(var s:tMemoryStream);
21 procedure Send(s:tMemoryStream);
22 {the stream can be invalidated, but the buffer must not be modified or freed}
23 procedure Ack;
24 procedure Close;
25 private
26 txPk:pointer; txLen:word; {last sent, not acked msg}
27 txTime:tDateTime;
28 procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
29 procedure Done;
30 procedure Resend;
31 procedure OnReply(msg:tSMsg);
32 end;
34 type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
35 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
37 { download manager create FileRequest
38 File Request open chat session to server
39 upmgr accepts chat and send reply
40 FileRequest acks, chat is then closed after TimeWait
41 upmgr starts TC transfer
42 transfer finished, upmgr send new Chat to FileRequest
43 FileRequest acks, chat is closed on both ends
44 FileRequest can open new chat if blocks are missing
46 => chat msgs must be created with New, disposed by Chat
47 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
50 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
52 IMPLEMENTATION
53 uses SysUtils;
54 procedure tChat.Init(const iremote:tNetAddr);
55 begin
56 remote:=iremote;
57 opcode:=128;
58 while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
59 InitFrom(remote,opcode);
60 end;
61 procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
62 begin
63 remote:=iremote;
64 opcode:=iopcode;
65 SetMsgHandler(opcode,remote,@OnReply);
66 txSeq:=0;
67 rxSeq:=0;
68 rxAcked:=true; {to not ack pk 0}
69 closed:=false;
70 txPk:=nil;
71 txLen:=0;
72 callback:=nil;
73 TMhook:=nil;
74 DisposeHook:=nil;
75 RTT:=200; {a default for timeouts}
76 txTime:=0;
77 end;
78 {struct
79 opcode:byte
80 seq:2
81 ack_seq:2
82 data:xx
85 procedure tCHat.AddHeaders(var s:tMemoryStream);
86 begin s.skip(5) end;
88 procedure tChat.Send(s:tMemoryStream);
89 begin
90 assert(txLen=0);
91 //assert(assigned(callback));
92 Inc(txSeq);
93 s.Seek(0);
94 s.WriteByte(opcode);
95 s.WriteWord(txSeq,2);
96 if not rxAcked then begin
97 s.WriteWord(rxSeq,2);
98 rxAcked:=true;
99 end else s.WriteWord(0,2);
100 txPk:=s.base;
101 txLen:=s.Length;
102 ServerLoop.SendMessage(txPk^,txLen,remote);
103 ServerLoop.Shedule(RTT*2,@Resend);
104 txTime:=Now;
105 end;
107 procedure tChat.Ack;
108 var s:tMemoryStream;
109 begin
110 if not rxAcked then begin
111 s.Init(GetMem(5),0,5);
112 s.WriteByte(opcode);
113 s.WriteWord(0,2);
114 s.WriteWord(rxSeq,2);
115 ServerLoop.SendMessage(s.base^,s.length,remote);
116 FreeMem(s.base,s.length);
117 rxAcked:=true;
118 end;
119 end;
121 procedure tChat.Close;
122 begin
123 assert(not closed);
124 Ack;
125 closed:=true;
126 if txLen=0 {no packets in flight} then begin
127 Shedule(3000{todo},@Done); {wait for something lost}
128 callback:=nil; {avoid calling}
129 tmhook:=nil;
130 end;
131 end;
133 procedure tChat.Done;
134 begin
135 {called from sheduler, Done is unsheduled, Resend is not sheduled since ack was received when Done was sheduled}
136 if txLen>0 then FreeMem(txPk,txLen);
137 SetMsgHandler(opcode,remote,nil);
138 if assigned(DisposeHook) then DisposeHook
139 else FreeMem(@self,sizeof(self));
140 end;
142 procedure tChat.Resend;
143 {timeout waiting for ack}
144 begin
145 {resend and reshedule}
146 Assert(txLen>0);
147 txTime:=0;
148 RTT:=RTT*2;
149 if assigned(TMhook) and (not closed) then begin
150 TMhook(RTT);
151 if closed then begin
152 Done; {if hook decided to close then abort}
153 exit;
154 end;
155 end;
156 if (RTT>16000) and closed then begin
157 Done {give up}
158 end else begin
159 {finally resend the msg}
160 ServerLoop.SendMessage(txPk^,txLen,remote);
161 ServerLoop.Shedule(RTT,@Resend);
162 end;
163 end;
165 procedure tChat.OnReply(msg:tSMsg);
166 var seq,aseq:Word;
167 var s:tMemoryStream;
168 begin
169 msg.stream.skip(1{opcode});
170 seq:=msg.stream.ReadWord(2);
171 aseq:=msg.stream.ReadWord(2);
172 if seq>0 then {some data} begin
173 if seq<=rxSeq then {remote didnt get our ack} begin
174 s.Init(GetMem(5),0,5);
175 s.WriteByte(opcode);
176 s.WriteWord(0,2);
177 s.WriteWord(rxSeq,2);
178 ServerLoop.SendMessage(s.base^,s.length,remote);
179 FreeMem(s.base,s.length);
180 if seq=rxSeq then rxacked:=true;
181 end else begin
182 {some useful data!}
183 rxSeq:=seq;
184 rxAcked:=false;
185 if assigned(callback) then callback(msg,true);
186 end;
187 end;
188 if aseq>0 then {ack of our msg} begin
189 if (aseq=txSeq)and(txLen>0) {it is current} then begin
190 if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
191 FreeMem(txPk,txLen);
192 TxLen:=0;
193 txPk:=nil;
194 if assigned(callback) then callback(msg,false);
195 ServerLoop.UnShedule(@Resend);
196 end else {write(' old-ack')it is ack of old data, do nothing};
197 end;
198 end;
200 var ChatHandlers: array [1..32] of tChatHandler;
202 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
203 begin
204 assert(ChatHandlers[initcode]=nil);
205 ChatHandlers[initcode]:=handler;
206 end;
208 procedure OnHiMsg(msg:tSMsg);
209 {new chat was received!}
210 var opcode:byte;
211 var seq,aseq:word;
212 var hnd:tChatHandler;
213 var nchat:^tChat;
214 var ix:byte;
215 begin
216 opcode:=msg.stream.ReadByte;
217 assert(not IsMsgHandled(opcode,msg.source^));
218 seq:=msg.stream.ReadWord(2);
219 aseq:=msg.stream.ReadWord(2);
220 if (seq<>1)or(aseq>0) then exit; {invalid initial state}
221 ix:=msg.stream.ReadByte;
222 if (ix<1)or(ix>high(ChatHandlers)) then exit;
223 hnd:=ChatHandlers[ix];
224 if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
225 msg.stream.seek(msg.stream.position-1);{unskip the initcode}
226 nchat:=GetMem(sizeof(tChat));
227 nchat^.InitFrom(msg.Source^,opcode);
228 nchat^.rxacked:=false;
229 nchat^.rxSeq:=1;
230 hnd(nchat^,msg);
231 end;
233 BEGIN
234 FillChar(ChatHandlers,sizeof(chathandlers),0);
235 ServerLoop.SetHiMsgHandler(@OnHiMsg);
236 END.