Fix Sheduler. Also raise receive buffer to 4k. +pollution
[brdnet.git] / Chat.pas
blob9391ffcba7bf14088da0348ee633eacc0a8716eb
1 unit Chat;
3 Implement two-way realiable acked lock-step protocol
5 INTERFACE
6 uses NetAddr,ServerLoop,MemStream;
8 type tChat=object
9 remote:tNetAddr;
10 opcode:byte;
11 txSeq:Word;
12 rxSeq:Word;
13 rxAcked:boolean;
14 closed:boolean;
15 RTT:LongWord;{in ms}
16 callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
17 TMhook : procedure(willwait:LongWord ) of object;
18 DisposeHook: procedure of object; {called instead of freeing self}
19 procedure Init(const iremote:tNetAddr);
20 procedure AddHeaders(var s:tMemoryStream);
21 procedure StreamInit(var s:tMemoryStream; l:word);
22 procedure Send(s:tMemoryStream);
23 {the stream can be invalidated, but the buffer must not be modified or freed}
24 procedure Ack;
25 procedure Close;
26 private
27 txPk:pointer; txLen:word; {last sent, not acked msg}
28 txTime:tDateTime;
29 procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
30 procedure Done;
31 procedure Resend;
32 procedure OnReply(msg:tSMsg);
33 end;
35 type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
36 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
38 { download manager create FileRequest
39 File Request open chat session to server
40 upmgr accepts chat and send reply
41 FileRequest acks, chat is then closed after TimeWait
42 upmgr starts TC transfer
43 transfer finished, upmgr send new Chat to FileRequest
44 FileRequest acks, chat is closed on both ends
45 FileRequest can open new chat if blocks are missing
47 => chat msgs must be created with New, disposed by Chat
48 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
51 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
53 IMPLEMENTATION
54 uses SysUtils;
55 procedure tChat.Init(const iremote:tNetAddr);
56 begin
57 remote:=iremote;
58 opcode:=128+Random(128);
59 while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
60 InitFrom(remote,opcode);
61 end;
62 procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
63 begin
64 remote:=iremote;
65 opcode:=iopcode;
66 SetMsgHandler(opcode,remote,@OnReply);
67 txSeq:=0;
68 rxSeq:=0;
69 rxAcked:=true; {to not ack pk 0}
70 closed:=false;
71 txPk:=nil;
72 txLen:=0;
73 callback:=nil;
74 TMhook:=nil;
75 DisposeHook:=nil;
76 RTT:=200; {a default for timeouts}
77 txTime:=0;
78 end;
79 {struct
80 opcode:byte
81 seq:2
82 ack_seq:2
83 data:xx
86 procedure tCHat.AddHeaders(var s:tMemoryStream);
87 begin s.skip(5) end;
88 procedure tChat.StreamInit(var s:tMemoryStream; l:word);
89 begin
90 s.Init(GetMem(l+5),0,l+5);
91 AddHeaders(s);
92 end;
94 procedure tChat.Send(s:tMemoryStream);
95 begin
96 assert(txLen=0);
97 //assert(assigned(callback));
98 Inc(txSeq);
99 s.Seek(0);
100 s.WriteByte(opcode);
101 s.WriteWord(txSeq,2);
102 if not rxAcked then begin
103 s.WriteWord(rxSeq,2);
104 rxAcked:=true;
105 end else s.WriteWord(0,2);
106 txPk:=s.base;
107 txLen:=s.Length;
108 ServerLoop.SendMessage(txPk^,txLen,remote);
109 ServerLoop.Shedule(RTT*2,@Resend);
110 txTime:=Now;
111 end;
113 procedure tChat.Ack;
114 var s:tMemoryStream;
115 begin
116 if not rxAcked then begin
117 s.Init(GetMem(5),0,5);
118 s.WriteByte(opcode);
119 s.WriteWord(0,2);
120 s.WriteWord(rxSeq,2);
121 ServerLoop.SendMessage(s.base^,s.length,remote);
122 FreeMem(s.base,s.length);
123 rxAcked:=true;
124 end;
125 end;
127 procedure tChat.Close;
128 begin
129 assert(not closed);
130 Ack;
131 closed:=true;
132 //writeln('Chat: closing');
133 if txLen=0 {no packets in flight} then begin
134 Shedule(15000{todo},@Done); {wait for something lost}
135 callback:=nil; {avoid calling}
136 tmhook:=nil;
137 end;
138 end;
140 procedure tChat.Done;
141 begin
142 if txLen>0 then FreeMem(txPk,txLen);
143 SetMsgHandler(opcode,remote,nil);
144 if assigned(DisposeHook) then DisposeHook
145 else FreeMem(@self,sizeof(self));
146 //writeln('Chat: closed');
147 end;
149 procedure tChat.Resend;
150 {timeout waiting for ack}
151 begin
152 {resend and reshedule}
153 if txLen=0 then exit;
154 txTime:=0;
155 if RTT<1 then RTT:=2;
156 RTT:=RTT*2;
157 if assigned(TMhook) and (not closed) then begin
158 TMhook(RTT);
159 if closed then begin
160 Done; {if hook decided to close then abort}
161 exit;
162 end;
163 end;
164 if closed and (RTT<400) then RTT:=400;
165 if (RTT>=5000) and closed then begin
166 Done {give up}
167 end else begin
168 {finally resend the msg}
169 //writeln('Chat: retry');
170 ServerLoop.SendMessage(txPk^,txLen,remote);
171 ServerLoop.Shedule(RTT,@Resend);
172 end;
173 end;
175 procedure tChat.OnReply(msg:tSMsg);
176 var seq,aseq:Word;
177 var s:tMemoryStream;
178 begin
179 msg.stream.skip(1{opcode});
180 seq:=msg.stream.ReadWord(2);
181 aseq:=msg.stream.ReadWord(2);
182 if aseq>0 then {ack of our msg} begin
183 if (aseq=txSeq)and(txLen>0) {it is current} then begin
184 if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
185 FreeMem(txPk,txLen);
186 UnShedule(@Resend);
187 if Closed then Shedule(5,@Done);
188 TxLen:=0;
189 txPk:=nil;
190 if assigned(callback) then callback(msg,false);
191 end else {write(' old-ack')it is ack of old data, do nothing};
192 end;
193 if seq>0 then {some data} begin
194 if seq<=rxSeq then {remote didnt get our ack} begin
195 s.Init(GetMem(5),0,5);
196 s.WriteByte(opcode);
197 s.WriteWord(0,2);
198 s.WriteWord(rxSeq,2);
199 ServerLoop.SendMessage(s.base^,s.length,remote);
200 FreeMem(s.base,s.length);
201 if seq=rxSeq then rxacked:=true;
202 end else begin
203 {some useful data!}
204 rxSeq:=seq;
205 rxAcked:=false;
206 if assigned(callback) then callback(msg,true);
207 end;
208 end;
209 end;
211 var ChatHandlers: array [1..32] of tChatHandler;
213 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
214 begin
215 assert(ChatHandlers[initcode]=nil);
216 ChatHandlers[initcode]:=handler;
217 end;
219 procedure OnHiMsg(msg:tSMsg);
220 {new chat was received!}
221 var opcode:byte;
222 var seq,aseq:word;
223 var hnd:tChatHandler;
224 var nchat:^tChat;
225 var ix:byte;
226 begin
227 opcode:=msg.stream.ReadByte;
228 assert(not IsMsgHandled(opcode,msg.source^));
229 seq:=msg.stream.ReadWord(2);
230 aseq:=msg.stream.ReadWord(2);
231 if (seq<>1)or(aseq>0) then exit; {invalid initial state}
232 ix:=msg.stream.ReadByte;
233 if (ix<1)or(ix>high(ChatHandlers)) then exit;
234 hnd:=ChatHandlers[ix];
235 if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
236 msg.stream.seek(msg.stream.position-1);{unskip the initcode}
237 nchat:=GetMem(sizeof(tChat));
238 nchat^.InitFrom(msg.Source^,opcode);
239 nchat^.rxacked:=false;
240 nchat^.rxSeq:=1;
241 hnd(nchat^,msg);
242 end;
244 BEGIN
245 FillChar(ChatHandlers,sizeof(chathandlers),0);
246 ServerLoop.SetHiMsgHandler(@OnHiMsg);
247 END.