Chat complete impl
[brdnet.git] / Chat.pas
blobe57338bda4bfb0ef1f5097af6ed4caeab1f9fbd3
1 unit Chat;
3 Implement two-way realiable acked lock-step protocol
5 INTERFACE
6 uses NetAddr,ServerLoop,MemStream;
8 type tChat=object
9 remote:tNetAddr;
10 opcode:byte;
11 txSeq:Word;
12 rxSeq:Word;
13 rxAcked:boolean;
14 closed:boolean;
15 RTT:LongWord;{in ms}
16 callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
17 TMhook : procedure(willwait:LongWord ) of object;
18 procedure Init(const iremote:tNetAddr);
19 procedure AddHeaders(var s:tMemoryStream);
20 procedure Send(s:tMemoryStream);
21 {the stream can be invalidated, but the buffer must not be modified or freed}
22 procedure Ack;
23 procedure Close;
24 private
25 txPk:pointer; txLen:word; {last sent, not acked msg}
26 txTime:tDateTime;
27 procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
28 procedure Done;
29 procedure Resend;
30 procedure OnReply(msg:tSMsg);
31 end;
33 type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
34 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
36 { download manager create FileRequest
37 File Request open chat session to server
38 upmgr accepts chat and send reply
39 FileRequest acks, chat is then closed after TimeWait
40 upmgr starts TC transfer
41 transfer finished, upmgr send new Chat to FileRequest
42 FileRequest acks, chat is closed on both ends
43 FileRequest can open new chat if blocks are missing
45 => chat msgs must be created with New, disposed by Chat
46 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
49 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
51 IMPLEMENTATION
52 uses SysUtils;
53 procedure tChat.Init(const iremote:tNetAddr);
54 begin
55 remote:=iremote;
56 opcode:=128;
57 while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
58 InitFrom(remote,opcode);
59 end;
60 procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
61 begin
62 opcode:=iopcode;
63 txSeq:=0;
64 rxSeq:=0;
65 rxAcked:=true; {to not ack pk 0}
66 closed:=false;
67 txPk:=nil;
68 txLen:=0;
69 callback:=nil;
70 TMhook:=nil;
71 RTT:=500; {a default for timeouts}
72 txTime:=0;
73 end;
74 {struct
75 opcode:byte
76 seq:2
77 ack_seq:2
78 data:xx
81 procedure tCHat.AddHeaders(var s:tMemoryStream);
82 begin s.skip(5) end;
84 procedure tChat.Send(s:tMemoryStream);
85 begin
86 assert(txLen=0);
87 assert(assigned(callback));
88 Inc(txSeq);
89 s.Seek(0);
90 s.WriteByte(opcode);
91 s.WriteWord(txSeq,2);
92 if not rxAcked then begin
93 s.WriteWord(rxSeq,2);
94 rxAcked:=true;
95 end else s.WriteWord(0,2);
96 txPk:=s.base;
97 txLen:=s.Length;
98 ServerLoop.SendMessage(txPk^,txLen,remote);
99 ServerLoop.Shedule(RTT*2,@Resend);
100 txTime:=Now;
101 end;
103 procedure tChat.Ack;
104 var s:tMemoryStream;
105 begin
106 if not rxAcked then begin
107 s.Init(GetMem(5),0,5);
108 s.WriteByte(opcode);
109 s.WriteWord(0,2);
110 s.WriteWord(rxSeq,2);
111 ServerLoop.SendMessage(s.base^,s.length,remote);
112 FreeMem(s.base,s.length);
113 rxAcked:=true;
114 end;
115 end;
117 procedure tChat.Close;
118 begin
119 assert(not closed);
120 closed:=true;
121 if txLen=0 {no packets in flight} then begin
122 Shedule(3000{todo},@Done); {wait for something lost}
123 callback:=nil; {avoid calling}
124 end;
125 end;
127 procedure tChat.Done;
128 begin
129 {called from sheduler, Done is unsheduled, Resend is not sheduled since ack was received when Done was sheduled}
130 FreeMem(txPk,txLen);
131 SetMsgHandler(opcode,remote,nil);
132 FreeMem(@self,sizeof(self));
133 end;
135 procedure tChat.Resend;
136 {timeout waiting for ack}
137 begin
138 {resend and reshedule}
139 txTime:=0;
140 RTT:=RTT*2;
141 if assigned(TMhook) and (not closed) then begin
142 TMhook(RTT*2);
143 if closed then begin
144 Done; {if hook decided to close then abort}
145 exit;
146 end;
147 end;
148 if (RTT>32000) and closed
149 then Done {give up}
150 else if txLen>0 then begin
151 ServerLoop.SendMessage(txPk^,txLen,remote);
152 ServerLoop.Shedule(RTT*2,@Resend);
153 end;
154 end;
156 procedure tChat.OnReply(msg:tSMsg);
157 var seq,aseq:Word;
158 var s:tMemoryStream;
159 begin
160 msg.stream.skip(1{opcode});
161 seq:=msg.stream.ReadWord(2);
162 aseq:=msg.stream.ReadWord(2);
163 if seq>0 then {some data} begin
164 if seq<=rxSeq then {remote didnt get our ack} begin
165 s.Init(GetMem(5),0,5);
166 s.WriteByte(opcode);
167 s.WriteWord(0,2);
168 s.WriteWord(rxSeq,2);
169 ServerLoop.SendMessage(s.base^,s.length,remote);
170 FreeMem(s.base,s.length);
171 if seq=rxSeq then rxacked:=true;
172 end else begin
173 {some useful data!}
174 rxSeq:=seq;
175 rxAcked:=false;
176 callback(msg,true);
177 end;
178 end;
179 if aseq>0 then {ack of our msg} begin
180 if (aseq=rxSeq)and(txLen>0) {it is current} then begin
181 if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
182 FreeMem(txPk,txLen);
183 TxLen:=0;
184 txPk:=nil;
185 callback(msg,false);
186 end else {it is ack of old data, do nothing};
187 end;
188 end;
190 var ChatHandlers: array [1..32] of tChatHandler;
192 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
193 begin
194 assert(ChatHandlers[initcode]=nil);
195 ChatHandlers[initcode]:=handler;
196 end;
198 procedure OnHiMsg(msg:tSMsg);
199 {new chat was received!}
200 var opcode:byte;
201 var seq,aseq:word;
202 var hnd:tChatHandler;
203 var nchat:^tChat;
204 var ix:byte;
205 begin
206 opcode:=msg.stream.ReadByte;
207 assert(not IsMsgHandled(opcode,msg.source^));
208 seq:=msg.stream.ReadWord(2);
209 aseq:=msg.stream.ReadWord(2);
210 if (seq<>1)and(aseq>0) then exit; {invalid initial state}
211 ix:=msg.stream.ReadByte;
212 if (ix<1)or(ix>high(ChatHandlers)) then exit;
213 hnd:=ChatHandlers[ix];
214 if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
215 msg.stream.seek(msg.stream.position-1);{unskip the initcode}
216 New(nchat);
217 nchat^.InitFrom(msg.Source^,opcode);
218 hnd(nchat^,msg);
219 end;
221 BEGIN
222 FillChar(ChatHandlers,sizeof(chathandlers),0);
223 ServerLoop.SetHiMsgHandler(@OnHiMsg);
224 END.