Recover broken bucket from progressively more bad nodes from other buckets.
[brdnet.git] / Chat.pas
blob409c8a32f79df99008d052898767dc1b96d92a30
1 unit Chat;
3 Implement two-way realiable acked lock-step protocol
5 INTERFACE
6 uses NetAddr,ServerLoop,MemStream;
8 type tChat=object
9 remote:tNetAddr;
10 opcode:byte;
11 txSeq:Word;
12 rxSeq:Word;
13 rxAcked:boolean;
14 closed:boolean;
15 RTT:LongWord;{in ms}
16 callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
17 TMhook : procedure(willwait:LongWord ) of object;
18 DisposeHook: procedure of object; {called instead of freeing self}
19 procedure Init(const iremote:tNetAddr);
20 procedure AddHeaders(var s:tMemoryStream);
21 procedure StreamInit(var s:tMemoryStream; l:word);
22 procedure Send(s:tMemoryStream);
23 {the stream can be invalidated, but the buffer must not be modified or freed}
24 procedure Ack;
25 procedure Close;
26 private
27 txPk:pointer; txLen:word; {last sent, not acked msg}
28 txTime:tDateTime;
29 procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
30 procedure Done;
31 procedure Resend;
32 procedure OnReply(msg:tSMsg);
33 end;
35 type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
36 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
38 { download manager create FileRequest
39 File Request open chat session to server
40 upmgr accepts chat and send reply
41 FileRequest acks, chat is then closed after TimeWait
42 upmgr starts TC transfer
43 transfer finished, upmgr send new Chat to FileRequest
44 FileRequest acks, chat is closed on both ends
45 FileRequest can open new chat if blocks are missing
47 => chat msgs must be created with New, disposed by Chat
48 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
51 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
53 IMPLEMENTATION
54 uses SysUtils;
55 procedure tChat.Init(const iremote:tNetAddr);
56 begin
57 remote:=iremote;
58 opcode:=128+Random(128);
59 while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
60 InitFrom(remote,opcode);
61 end;
62 procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
63 begin
64 remote:=iremote;
65 opcode:=iopcode;
66 SetMsgHandler(opcode,remote,@OnReply);
67 txSeq:=0;
68 rxSeq:=0;
69 rxAcked:=true; {to not ack pk 0}
70 closed:=false;
71 txPk:=nil;
72 txLen:=0;
73 callback:=nil;
74 TMhook:=nil;
75 DisposeHook:=nil;
76 RTT:=200; {a default for timeouts}
77 txTime:=0;
78 end;
79 {struct
80 opcode:byte
81 seq:2
82 ack_seq:2
83 data:xx
86 procedure tCHat.AddHeaders(var s:tMemoryStream);
87 begin s.skip(5) end;
88 procedure tChat.StreamInit(var s:tMemoryStream; l:word);
89 begin
90 s.Init(GetMem(l+5),0,l+5);
91 AddHeaders(s);
92 end;
94 procedure tChat.Send(s:tMemoryStream);
95 begin
96 if txLen>0 then begin
97 FreeMem(txPk,txLen);
98 UnShedule(@Resend);
99 end;
100 //assert(assigned(callback));
101 Inc(txSeq);
102 s.Seek(0);
103 s.WriteByte(opcode);
104 s.WriteWord(txSeq,2);
105 if not rxAcked then begin
106 s.WriteWord(rxSeq,2);
107 rxAcked:=true;
108 end else s.WriteWord(0,2);
109 txPk:=s.base;
110 txLen:=s.Length;
111 ServerLoop.SendMessage(txPk^,txLen,remote);
112 ServerLoop.Shedule(RTT*2,@Resend);
113 txTime:=Now;
114 end;
116 procedure tChat.Ack;
117 var s:tMemoryStream;
118 begin
119 if not rxAcked then begin
120 s.Init(GetMem(5),0,5);
121 s.WriteByte(opcode);
122 s.WriteWord(0,2);
123 s.WriteWord(rxSeq,2);
124 ServerLoop.SendMessage(s.base^,s.length,remote);
125 FreeMem(s.base,s.length);
126 rxAcked:=true;
127 end;
128 end;
130 procedure tChat.Close;
131 begin
132 assert(not closed);
133 Ack;
134 closed:=true;
135 callback:=nil; {avoid calling}
136 tmhook:=nil;
137 //writeln('Chat: closing');
138 if txLen=0 {no packets in flight} then begin
139 Shedule(15000{todo},@Done); {wait for something lost}
140 end;
141 end;
143 procedure tChat.Done;
144 begin
145 if txLen>0 then FreeMem(txPk,txLen);
146 SetMsgHandler(opcode,remote,nil);
147 if assigned(DisposeHook) then DisposeHook
148 else FreeMem(@self,sizeof(self));
149 //writeln('Chat: closed');
150 end;
152 procedure tChat.Resend;
153 {timeout waiting for ack}
154 begin
155 {resend and reshedule}
156 if txLen=0 then exit;
157 txTime:=0;
158 if RTT<1 then RTT:=2;
159 RTT:=RTT*2;
160 if assigned(TMhook) and (not closed) then begin
161 TMhook(RTT);
162 if closed then begin
163 Done; {if hook decided to close then abort}
164 exit;
165 end;
166 end;
167 if closed and (RTT<400) then RTT:=400;
168 if (RTT>=5000) and closed then begin
169 Done {give up}
170 end else begin
171 {finally resend the msg}
172 //writeln('Chat: retry');
173 ServerLoop.SendMessage(txPk^,txLen,remote);
174 ServerLoop.Shedule(RTT,@Resend);
175 end;
176 end;
178 procedure tChat.OnReply(msg:tSMsg);
179 var seq,aseq:Word;
180 var s:tMemoryStream;
181 begin
182 msg.stream.skip(1{opcode});
183 seq:=msg.stream.ReadWord(2);
184 aseq:=msg.stream.ReadWord(2);
185 if aseq>0 then {ack of our msg} begin
186 if (aseq=txSeq)and(txLen>0) {it is current} then begin
187 if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
188 FreeMem(txPk,txLen);
189 UnShedule(@Resend);
190 if Closed then Shedule(5,@Done);
191 TxLen:=0;
192 txPk:=nil;
193 if assigned(callback) then callback(msg,false);
194 end else {write(' old-ack')it is ack of old data, do nothing};
195 end;
196 if seq>0 then {some data} begin
197 if seq<=rxSeq then {remote didnt get our ack} begin
198 s.Init(GetMem(5),0,5);
199 s.WriteByte(opcode);
200 s.WriteWord(0,2);
201 s.WriteWord(rxSeq,2);
202 ServerLoop.SendMessage(s.base^,s.length,remote);
203 FreeMem(s.base,s.length);
204 if seq=rxSeq then rxacked:=true;
205 end else begin
206 {some useful data!}
207 rxSeq:=seq;
208 rxAcked:=false;
209 if assigned(callback) then callback(msg,true);
210 end;
211 end;
212 end;
214 var ChatHandlers: array [1..32] of tChatHandler;
216 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
217 begin
218 assert(ChatHandlers[initcode]=nil);
219 ChatHandlers[initcode]:=handler;
220 end;
222 procedure OnHiMsg(msg:tSMsg);
223 {new chat was received!}
224 var opcode:byte;
225 var seq,aseq:word;
226 var hnd:tChatHandler;
227 var nchat:^tChat;
228 var ix:byte;
229 begin
230 opcode:=msg.stream.ReadByte;
231 assert(not IsMsgHandled(opcode,msg.source^));
232 seq:=msg.stream.ReadWord(2);
233 aseq:=msg.stream.ReadWord(2);
234 if (seq<>1)or(aseq>0) then exit; {invalid initial state}
235 ix:=msg.stream.ReadByte;
236 if (ix<1)or(ix>high(ChatHandlers)) then exit;
237 hnd:=ChatHandlers[ix];
238 if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
239 msg.stream.seek(msg.stream.position-1);{unskip the initcode}
240 nchat:=GetMem(sizeof(tChat));
241 nchat^.InitFrom(msg.Source^,opcode);
242 nchat^.rxacked:=false;
243 nchat^.rxSeq:=1;
244 hnd(nchat^,msg);
245 end;
247 BEGIN
248 FillChar(ChatHandlers,sizeof(chathandlers),0);
249 ServerLoop.SetHiMsgHandler(@OnHiMsg);
250 END.