6 uses NetAddr
,Store2
,ServerLoop
,MemStream
;
13 Callback
:procedure of object;
17 error
:word;{0progress,1done,2ioerror,3timeout,4full,OT-DLEs}
18 dataf
:file of byte; {do not touch while active}
20 procedure Init(const srce
:tNetAddr
; const iFID
:tFID
);
23 procedure MakeRequest(diag
:byte);
30 DgrCntSinceReq
:LongWord
;
31 procedure OnData(msg
:tMemoryStream
);
32 procedure OnTimeout
; {nothing is flowing thru aggr or afterrequest check}
38 channel
: array [1..16] of ^tJob
;
39 refc
,ncpt
,idletick
: byte;
45 procedure Init(const src
:tNetAddr
);
46 procedure Add(var job
: tJob
);
47 procedure Del(var job
: tJob
);
48 procedure OnData(msg
:tSMsg
);
65 function GetAggr(const remote
:tNetAddr
):tAggr_ptr
;
71 while assigned(a
) do begin
72 if a
^.remote
=remote
then begin
83 procedure tJob
.Init(const srce
:tNetAddr
; const iFID
:tFID
);
92 AssignTempObject(dataf
,fid
,'prt');
93 {$I-}ReWrite(dataf
,1);
94 Seek(dataf
,cObjHeaderSize
);{$I+}
95 if IOResult
>0 then begin error
:=2; exit
end;
96 {$note Open SEG file and read it ToDo}
98 if not assigned(aggr
) then begin
100 aggr
^.next
:=AggrChain
;
113 procedure tJob
.Start
;
115 Assert( (Weight
+opcode
.otReq
)<=high(byte));
116 Assert(assigned(callback
));
122 if error
<>1 then Close(dataf
);
123 UnShedule(@OnTimeout
);
127 {Strategy for fixing holes without waiting for DONE message:
128 * stuff all small segments to LSEG
129 * put large one at end
130 * when datagrams from the large arrive, Repeat
133 procedure tJob
.MakeRequest(diag
:byte);
135 var b
,l
,ReqLen
:LongWord
;
138 const ReqLenLim
=20000000;
140 write('ObjTrans.',string(aggr
^.remote
),'#',ch
,'.MakeRequest',diag
);
145 s
.WriteByte(opcode
.otCtrl
);
146 s
.WriteByte(opcode
.otReq
+Weight
);
150 if seg
=nil then begin
153 if assigned(seg
) and (seg
^.first
=0) then begin
161 if assigned(seg
) then l
:=seg
^.first
-b
else l
:=Total
-b
;
163 if (ReqLen
+l
)>ReqLenLim
then l
:=ReqLenLim
-ReqLen
;
170 until (s
.WrBufLen
<9)or(ReqLen
>=ReqLenLim
)or(ReqCnt
>=MaxReqCnt
);
171 if ReqLen
=0 then begin
173 FreeMem(s
.base
,s
.size
);
175 if assigned(callback
) then Callback
;
177 writeln(' send ',s
.Length
);
178 SendMessage(s
.Base
^,s
.Length
,aggr
^.Remote
);
179 FreeMem(s
.base
,s
.size
);
180 UnShedule(@OnTimeout
);
181 Shedule(750,@OnTimeout
);
183 //HighestRequestBase:=b;
187 procedure tJob
.OnData(msg
:tMemoryStream
);
191 procedure SetSegment(first
,after
:LongWord
);
195 p
:=@FirstSeg
; c
:=p
^; k
:=nil;
196 while assigned(c
) do begin
198 if (c
^.first
<first
)and(c
^.after
>=first
) then first
:=c
^.first
;
199 if (c
^.after
>after
)and(c
^.first
<=after
) then after
:=c
^.after
;
200 {remove cur if fully contained in new}
201 if (first
<=c
^.first
)and(after
>=c
^.after
) then begin
203 if assigned(k
) then dispose(c
) else k
:=c
;
207 if first
>=after
then exit
;
209 {merge completed, insert new segment}
210 if not assigned(k
) then new(k
);
213 p
:=@FirstSeg
; c
:=p
^; while assigned(c
) and (c
^.first
<first
)
214 do begin p
:=@c
^.next
; c
:=p
^ end;
220 if hiOfs
<otInfo
then begin
221 Offset
:=msg
.ReadWord(4);
223 Seek(dataf
,Offset
+cObjHeaderSize
);
224 BlockWrite(dataf
,msg
.RdBuf
^,dtlen
);{$note iocheck todo}
225 SetSegment(Offset
,Offset
+dtlen
);
229 {otSINC handled in Aggr}
230 end else if hiOfs
=otInfo
then begin
232 Offset
:=msg
.ReadWord(4);
233 MaxReqCnt
:=msg
.ReadByte
;
234 Total
:=Offset
; {$hint dangerous}
235 end else if (hiOfs
=otFail
) or (hiOfs
=otNotFound
) then begin
238 if assigned(callback
) then Callback
;
240 else if hiOfs
=otEoT
then MakeRequest(3);
243 procedure tJob
.OnTimeout
;
244 {called by aggr or resheduled in MakeRequest}
246 if DgrCntSinceReq
=0 then begin
247 if RetryCounter
>=13 then begin
250 if assigned(callback
) then Callback
;
256 else DgrCntSinceReq
:=0; {next time aggr calls its probably serious}
259 procedure tAggr
.OnData(msg
:tSMsg
);
264 var s
:tMemoryStream
absolute msg
.stream
;
265 var debugmsg
:string[127];
270 assert(chn
<=high(channel
));
271 if DgrCnt
=0 then StartT
:=mNow
;
272 Inc(ByteCnt
,s
.Length
);
273 Inc(DgrCnt
); Inc(DgrCntCheck
);
274 if oh
=otRateInfo
then begin
275 SetLength(debugmsg
,s
.RdBufLen
);
276 s
.Read(debugmsg
[1],s
.RdBufLen
);
277 writeln('ObjTrans.',string(remote
),'#',chn
,'.ServerDebug: '+debugmsg
);
280 if not assigned(channel
[chn
]) then begin
285 SendMessage(s
.base
^,s
.length
,Remote
);
287 if oh
=otSINC
then s
.Read(sm
,2) else s
.seek(s
.position
-1);
288 channel
[chn
]^.OnData(s
);
289 if oh
=otSINC
then begin
292 s
.WriteByte(otCtrl
); s
.WriteByte(otSIACK
);
295 SendMessage(s
.base
^,s
.length
,Remote
);
296 end else if (DgrCnt
>=8) and ((mNow
-StartT
)>=400) then begin
298 s
.WriteByte(otCtrl
); s
.WriteByte(otSPEED
);
299 rate
:=(ByteCnt
/(mNow
-StartT
))*16;
300 s
.WriteWord(round(rate
),4);
303 SendMessage(s
.base
^,s
.length
,Remote
);
308 procedure tAggr
.Init(const src
:tNetAddr
);
313 ServerLoop
.SetMsgHandler(opcode
.otData
,Remote
,@OnData
);
314 Shedule(915,@Periodic
);
315 for i
:=1 to high(channel
) do channel
[i
]:=nil;
322 procedure tAggr
.Add(var job
: tJob
);
328 if i
=high(channel
) then i
:=1 else i
:=i
+1;
329 if channel
[i
]=nil then begin
331 channel
[job
.ch
]:=@job
;
338 procedure tAggr
.Del(var job
: tJob
);
341 Assert(assigned(channel
[job
.ch
]));
343 channel
[job
.ch
]:=nil;
347 procedure tAggr
.Periodic
;
350 var i
:integer (*absolute p*);
353 {check DgrCntCheck and issue Timeout}
354 if DgrCntCheck
=0 then begin
355 for i
:=1 to high(channel
) do if assigned(channel
[i
])
356 then channel
[i
]^.OnTimeout
;
357 end else DgrCntCheck
:=0;
359 {check idle time and delete self}
360 if idletick
>17 then begin
361 p
:=@AggrChain
;a
:=p
^; while assigned(a
) do begin
362 if a
=@self
then begin p
^:=next
; break
end;
365 SetMsgHandler(otData
,Remote
,nil);
366 FreeMem(@self
,sizeof(self
)); EXIT
;
367 end else inc(idletick
);
369 Shedule(700,@Periodic
);