3 Implement two-way realiable acked lock-step protocol
6 uses NetAddr
,ServerLoop
,MemStream
;
16 Callback
: procedure(msg
:tSMsg
; data
:boolean) of object; {client must maintain active chats}
17 OnTimeout
: procedure of object;
18 OnDispose
: procedure of object;
19 procedure Init(const iremote
:tNetAddr
);
20 procedure SetTimeout(acktm
,repltm
:LongInt);
21 procedure AddHeaders(var s
:tMemoryStream
);
22 procedure StreamInit(var s
:tMemoryStream
; l
:word);
23 procedure Send(s
:tMemoryStream
);
24 {the stream can be invalidated, but the buffer must not be modified or freed}
28 txPk
:pointer; txLen
:word; {last sent, not acked msg}
30 tmAck
,tmReply
:LongWord
;{ms}
31 procedure InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
34 procedure OnReply(msg
:tSMsg
);
35 procedure ReplyTimeout
;
38 type tChatHandler
=procedure(var nchat
:tChat
; msg
:tSMsg
);
39 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
41 { download manager create FileRequest
42 File Request open chat session to server
43 upmgr accepts chat and send reply
44 FileRequest acks, chat is then closed after TimeWait
45 upmgr starts TC transfer
46 transfer finished, upmgr send new Chat to FileRequest
47 FileRequest acks, chat is closed on both ends
48 FileRequest can open new chat if blocks are missing
50 => chat msgs must be created with New, disposed by Chat
51 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
54 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
58 procedure tChat
.Init(const iremote
:tNetAddr
);
61 opcode
:=128+Random(128);
62 while ServerLoop
.IsMsgHandled(opcode
,remote
) do inc(opcode
);
63 InitFrom(remote
,opcode
);
65 procedure tChat
.InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
69 SetMsgHandler(opcode
,remote
,@OnReply
);
72 rxAcked
:=true; {to not ack pk 0}
79 RTT
:=200; {a default for timeouts}
91 procedure tCHat
.AddHeaders(var s
:tMemoryStream
);
93 procedure tChat
.StreamInit(var s
:tMemoryStream
; l
:word);
95 s
.Init(GetMem(l
+5),0,l
+5);
98 procedure tChat
.SetTimeout(acktm
,repltm
:LongInt);
100 assert(assigned(OnTimeout
));
105 procedure tChat
.Send(s
:tMemoryStream
);
107 if txLen
>0 then begin
111 //assert(assigned(callback));
115 s
.WriteWord(txSeq
,2);
116 if not rxAcked
then begin
117 s
.WriteWord(rxSeq
,2);
119 end else s
.WriteWord(0,2);
122 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
123 ServerLoop
.Shedule(RTT
*2,@Resend
);
130 if not rxAcked
then begin
131 s
.Init(GetMem(5),0,5);
134 s
.WriteWord(rxSeq
,2);
135 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
136 FreeMem(s
.base
,s
.length
);
141 procedure tChat
.Close
;
146 callback
:=nil; {avoid calling}
148 //writeln('Chat: closing');
149 if txLen
=0 {no packets in flight} then begin
150 Shedule(5000{todo},@Done
); {wait for something lost}
154 procedure tChat
.Done
;
156 if txLen
>0 then FreeMem(txPk
,txLen
);
157 SetMsgHandler(opcode
,remote
,nil);
159 UnShedule(@ReplyTimeout
);
160 if assigned(OnDispose
) then OnDispose
161 else FreeMem(@self
,sizeof(self
));
162 //writeln('Chat: closed');
165 procedure tChat
.Resend
;
166 {timeout waiting for ack}
168 {check for timeout and closed}
169 if RTT
<1 then RTT
:=2; RTT
:=RTT
*2;
170 if closed
and (RTT
>5000) then begin
174 if (not closed
) and (tmAck
>0) and (RTT
>tmAck
) then begin
175 if assigned(ontimeout
) then OnTimeout
;
180 //writeln('Chat: retry');
181 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
184 ServerLoop
.Shedule(RTT
,@Resend
);
187 procedure tChat
.OnReply(msg
:tSMsg
);
191 msg
.stream
.skip(1{opcode});
192 seq
:=msg
.stream
.ReadWord(2);
193 aseq
:=msg
.stream
.ReadWord(2);
194 if aseq
>0 then {ack of our msg} begin
195 if (aseq
=txSeq
)and(txLen
>0) {it is current} then begin
196 if txTime
>0 then RTT
:=Round((Now
-txTime
)*MsecsPerDay
);
199 if Closed
then Shedule(5,@Done
);{wtf?}
202 if assigned(callback
) then callback(msg
,false);
203 if assigned(OnTimeout
) and (tmReply
>0) then Shedule(tmReply
,@ReplyTimeout
);
204 end else {write(' old-ack')it is ack of old data, do nothing};
206 if seq
>0 then {some data} begin
207 if seq
<=rxSeq
then {remote didnt get our ack} begin
208 s
.Init(GetMem(5),0,5);
211 s
.WriteWord(rxSeq
,2);
212 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
213 FreeMem(s
.base
,s
.length
);
214 if seq
=rxSeq
then rxacked
:=true;
219 UnShedule(@ReplyTimeout
);
220 if assigned(callback
) then callback(msg
,true);
225 procedure tChat
.ReplyTimeout
;
227 assert(assigned(OnTimeout
));
232 var ChatHandlers
: array [1..32] of tChatHandler
;
234 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
236 assert(ChatHandlers
[initcode
]=nil);
237 ChatHandlers
[initcode
]:=handler
;
240 procedure OnHiMsg(msg
:tSMsg
);
241 {new chat was received!}
244 var hnd
:tChatHandler
;
248 opcode
:=msg
.stream
.ReadByte
;
249 assert(not IsMsgHandled(opcode
,msg
.source
^));
250 seq
:=msg
.stream
.ReadWord(2);
251 aseq
:=msg
.stream
.ReadWord(2);
252 if (seq
<>1)or(aseq
>0) then exit
; {invalid initial state}
253 ix
:=msg
.stream
.ReadByte
;
254 if (ix
<1)or(ix
>high(ChatHandlers
)) then exit
;
255 hnd
:=ChatHandlers
[ix
];
256 if not assigned(hnd
) then raise eXception
.Create('No handler for initcode '+IntToStr(ix
));
257 msg
.stream
.seek(msg
.stream
.position
-1);{unskip the initcode}
258 nchat
:=GetMem(sizeof(tChat
));
259 nchat
^.InitFrom(msg
.Source
^,opcode
);
260 nchat
^.rxacked
:=false;
266 FillChar(ChatHandlers
,sizeof(chathandlers
),0);
267 ServerLoop
.SetHiMsgHandler(@OnHiMsg
);